1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35 EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36 RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::tracing::OpenTelemetryContext;
41use mz_persist_client::Schemas;
42use mz_persist_types::codec_impls::UnitSchema;
43use mz_repr::explain::text::DisplayText;
44use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
45use mz_repr::{
46 Diff, GlobalId, IntoRowIterator, RelationDesc, RelationType, Row, RowIterator, preserves_order,
47};
48use mz_storage_types::sources::SourceData;
49use serde::{Deserialize, Serialize};
50use timely::progress::{Antichain, Timestamp};
51use uuid::Uuid;
52
53use crate::coord::timestamp_selection::TimestampDetermination;
54use crate::optimize::OptimizerError;
55use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
56use crate::util::ResultExt;
57use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
58
59#[derive(Debug)]
61pub(crate) struct PendingPeek {
62 pub(crate) conn_id: ConnectionId,
64 pub(crate) cluster_id: ClusterId,
66 pub(crate) depends_on: BTreeSet<GlobalId>,
68 pub(crate) ctx_extra: ExecuteContextExtra,
71 pub(crate) is_fast_path: bool,
73}
74
75#[derive(Debug)]
80pub enum PeekResponseUnary {
81 Rows(Box<dyn RowIterator + Send + Sync>),
82 Error(String),
83 Canceled,
84}
85
86#[derive(Clone, Debug)]
87pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
88 pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
89 pub(crate) id: GlobalId,
90 key: Vec<MirScalarExpr>,
91 permutation: Vec<usize>,
92 thinned_arity: usize,
93}
94
95impl<T> PeekDataflowPlan<T> {
96 pub fn new(
97 desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
98 id: GlobalId,
99 typ: &RelationType,
100 ) -> Self {
101 let arity = typ.arity();
102 let key = typ
103 .default_key()
104 .into_iter()
105 .map(MirScalarExpr::column)
106 .collect::<Vec<_>>();
107 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
108 Self {
109 desc,
110 id,
111 key,
112 permutation,
113 thinned_arity: thinning.len(),
114 }
115 }
116}
117
118#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
119pub enum FastPathPlan {
120 Constant(Result<Vec<(Row, Diff)>, EvalError>, RelationType),
125 PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
128 PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
130}
131
132impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
133 fn fmt_text(
134 &self,
135 f: &mut fmt::Formatter<'_>,
136 ctx: &mut PlanRenderingContext<'a, T>,
137 ) -> fmt::Result {
138 if ctx.config.verbose_syntax {
139 self.fmt_verbose_text(f, ctx)
140 } else {
141 self.fmt_default_text(f, ctx)
142 }
143 }
144}
145
146impl FastPathPlan {
147 pub fn fmt_default_text<'a, T>(
148 &self,
149 f: &mut fmt::Formatter<'_>,
150 ctx: &mut PlanRenderingContext<'a, T>,
151 ) -> fmt::Result {
152 let mode = HumanizedExplain::new(ctx.config.redacted);
153
154 match self {
155 FastPathPlan::Constant(rows, _) => {
156 write!(f, "{}→Constant ", ctx.indent)?;
157
158 match rows {
159 Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
160 Err(err) => {
161 if mode.redacted() {
162 writeln!(f, "(error: █)")?;
163 } else {
164 writeln!(f, "(error: {})", err.to_string().quoted(),)?;
165 }
166 }
167 }
168 }
169 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
170 let coll = ctx
171 .humanizer
172 .humanize_id(*coll_id)
173 .unwrap_or_else(|| coll_id.to_string());
174 let idx = ctx
175 .humanizer
176 .humanize_id(*idx_id)
177 .unwrap_or_else(|| idx_id.to_string());
178 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
179 ctx.indent.set();
180
181 ctx.indent += 1;
182
183 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
184 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
185
186 if printed {
187 ctx.indent += 1;
188 }
189 if let Some(literal_constraints) = literal_constraints {
190 writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
191 ctx.indent += 1;
192 let values = separated("; ", mode.seq(literal_constraints, None));
193 writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
194 } else {
195 writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
196 }
197
198 ctx.indent.reset();
199 }
200 FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
201 let coll = ctx
202 .humanizer
203 .humanize_id(*global_id)
204 .unwrap_or_else(|| global_id.to_string());
205 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
206 ctx.indent.set();
207
208 ctx.indent += 1;
209
210 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
211 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
212
213 if printed {
214 ctx.indent += 1;
215 }
216 if let Some(literal_constraint) = literal_constraint {
217 writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
218 ctx.indent += 1;
219 let value = mode.expr(literal_constraint, None);
220 writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
221 } else {
222 writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
223 }
224
225 ctx.indent.reset();
226 }
227 }
228
229 Ok(())
230 }
231
232 pub fn fmt_verbose_text<'a, T>(
233 &self,
234 f: &mut fmt::Formatter<'_>,
235 ctx: &mut PlanRenderingContext<'a, T>,
236 ) -> fmt::Result {
237 let redacted = ctx.config.redacted;
238 let mode = HumanizedExplain::new(redacted);
239
240 match self {
243 FastPathPlan::Constant(Ok(rows), _) => {
244 if !rows.is_empty() {
245 writeln!(f, "{}Constant", ctx.indent)?;
246 *ctx.as_mut() += 1;
247 fmt_text_constant_rows(
248 f,
249 rows.iter().map(|(row, diff)| (row, diff)),
250 ctx.as_mut(),
251 redacted,
252 )?;
253 *ctx.as_mut() -= 1;
254 } else {
255 writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
256 }
257 Ok(())
258 }
259 FastPathPlan::Constant(Err(err), _) => {
260 if redacted {
261 writeln!(f, "{}Error █", ctx.as_mut())
262 } else {
263 writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
264 }
265 }
266 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
267 ctx.as_mut().set();
268 let (map, filter, project) = mfp.as_map_filter_project();
269
270 let cols = if !ctx.config.humanized_exprs {
271 None
272 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
273 let cols = itertools::chain(
277 cols.iter().cloned(),
278 std::iter::repeat(String::new()).take(map.len()),
279 )
280 .collect();
281 Some(cols)
282 } else {
283 None
284 };
285
286 if project.len() != mfp.input_arity + map.len()
287 || !project.iter().enumerate().all(|(i, o)| i == *o)
288 {
289 let outputs = mode.seq(&project, cols.as_ref());
290 let outputs = CompactScalars(outputs);
291 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
292 *ctx.as_mut() += 1;
293 }
294 if !filter.is_empty() {
295 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
296 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
297 *ctx.as_mut() += 1;
298 }
299 if !map.is_empty() {
300 let scalars = mode.seq(&map, cols.as_ref());
301 let scalars = CompactScalars(scalars);
302 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
303 *ctx.as_mut() += 1;
304 }
305 MirRelationExpr::fmt_indexed_filter(
306 f,
307 ctx,
308 coll_id,
309 idx_id,
310 literal_constraints.clone(),
311 None,
312 )?;
313 writeln!(f)?;
314 ctx.as_mut().reset();
315 Ok(())
316 }
317 FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
318 ctx.as_mut().set();
319 let (map, filter, project) = mfp.as_map_filter_project();
320
321 let cols = if !ctx.config.humanized_exprs {
322 None
323 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
324 let cols = itertools::chain(
325 cols.iter().cloned(),
326 std::iter::repeat(String::new()).take(map.len()),
327 )
328 .collect::<Vec<_>>();
329 Some(cols)
330 } else {
331 None
332 };
333
334 if project.len() != mfp.input_arity + map.len()
335 || !project.iter().enumerate().all(|(i, o)| i == *o)
336 {
337 let outputs = mode.seq(&project, cols.as_ref());
338 let outputs = CompactScalars(outputs);
339 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
340 *ctx.as_mut() += 1;
341 }
342 if !filter.is_empty() {
343 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
344 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
345 *ctx.as_mut() += 1;
346 }
347 if !map.is_empty() {
348 let scalars = mode.seq(&map, cols.as_ref());
349 let scalars = CompactScalars(scalars);
350 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
351 *ctx.as_mut() += 1;
352 }
353 let human_id = ctx
354 .humanizer
355 .humanize_id(*gid)
356 .unwrap_or_else(|| gid.to_string());
357 write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
358 if let Some(literal) = literal_constraint {
359 let value = mode.expr(literal, None);
360 writeln!(f, " [value={}]", value)?;
361 } else {
362 writeln!(f, "")?;
363 }
364 ctx.as_mut().reset();
365 Ok(())
366 }
367 }?;
368 Ok(())
369 }
370}
371
372#[derive(Debug)]
373pub struct PlannedPeek {
374 pub plan: PeekPlan,
375 pub determination: TimestampDetermination<mz_repr::Timestamp>,
376 pub conn_id: ConnectionId,
377 pub intermediate_result_type: RelationType,
384 pub source_arity: usize,
385 pub source_ids: BTreeSet<GlobalId>,
386}
387
388#[derive(Clone, Debug)]
390pub enum PeekPlan<T = mz_repr::Timestamp> {
391 FastPath(FastPathPlan),
392 SlowPath(PeekDataflowPlan<T>),
394}
395
396fn mfp_to_safe_plan(
401 mfp: mz_expr::MapFilterProject,
402) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
403 mfp.into_plan()
404 .map_err(OptimizerError::InternalUnsafeMfpPlan)?
405 .into_nontemporal()
406 .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
407}
408
409fn permute_oneshot_mfp_around_index(
411 mfp: mz_expr::MapFilterProject,
412 key: &[MirScalarExpr],
413) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
414 let input_arity = mfp.input_arity;
415 let mut safe_mfp = mfp_to_safe_plan(mfp)?;
416 let (permute, thinning) = permutation_for_arrangement(key, input_arity);
417 safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
418 Ok(safe_mfp)
419}
420
421pub fn create_fast_path_plan<T: Timestamp>(
427 dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
428 view_id: GlobalId,
429 finishing: Option<&RowSetFinishing>,
430 persist_fast_path_limit: usize,
431 persist_fast_path_order: bool,
432) -> Result<Option<FastPathPlan>, OptimizerError> {
433 if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
439 {
440 let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
441 if let Some((rows, found_typ)) = mir.as_const() {
442 return Ok(Some(FastPathPlan::Constant(
444 rows.clone()
445 .map(|rows| rows.into_iter().map(|(row, diff)| (row, diff)).collect()),
446 found_typ.clone(),
447 )));
448 } else {
449 if let MirRelationExpr::TopK {
452 input,
453 group_key,
454 order_key,
455 limit,
456 offset,
457 monotonic: _,
458 expected_group_size: _,
459 } = mir
460 {
461 if let Some(finishing) = finishing {
462 if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
463 let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
465 (None, _) => true,
466 (Some(..), None) => false,
467 (Some(topk_limit), Some(finishing_limit)) => {
468 if let Some(l) = topk_limit.as_literal_int64() {
469 l >= *finishing_limit
470 } else {
471 false
472 }
473 }
474 };
475 if finishing_limits_at_least_as_topk {
476 mir = input;
477 }
478 }
479 }
480 }
481 let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
485 match mir {
486 MirRelationExpr::Get {
487 id: Id::Global(get_id),
488 typ: relation_typ,
489 ..
490 } => {
491 for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
493 if desc.on_id == *get_id {
494 return Ok(Some(FastPathPlan::PeekExisting(
495 *get_id,
496 *index_id,
497 None,
498 permute_oneshot_mfp_around_index(mfp, &desc.key)?,
499 )));
500 }
501 }
502
503 let safe_mfp = mfp_to_safe_plan(mfp)?;
507 let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
508
509 let literal_constraint = if persist_fast_path_order {
510 let mut row = Row::default();
511 let mut packer = row.packer();
512 for (idx, col) in relation_typ.column_types.iter().enumerate() {
513 if !preserves_order(&col.scalar_type) {
514 break;
515 }
516 let col_expr = MirScalarExpr::column(idx);
517
518 let Some((literal, _)) = filters
519 .iter()
520 .filter_map(|f| f.expr_eq_literal(&col_expr))
521 .next()
522 else {
523 break;
524 };
525 packer.extend_by_row(&literal);
526 }
527 if row.is_empty() { None } else { Some(row) }
528 } else {
529 None
530 };
531
532 let finish_ok = match &finishing {
533 None => false,
534 Some(RowSetFinishing {
535 order_by,
536 limit,
537 offset,
538 ..
539 }) => {
540 let order_ok = if persist_fast_path_order {
541 order_by.iter().enumerate().all(|(idx, order)| {
542 let column_idx = projection[order.column];
545 if column_idx >= safe_mfp.input_arity {
546 return false;
547 }
548 let column_type = &relation_typ.column_types[column_idx];
549 let index_ok = idx == column_idx;
550 let nulls_ok = !column_type.nullable || order.nulls_last;
551 let asc_ok = !order.desc;
552 let type_ok = preserves_order(&column_type.scalar_type);
553 index_ok && nulls_ok && asc_ok && type_ok
554 })
555 } else {
556 order_by.is_empty()
557 };
558 let limit_ok = limit.map_or(false, |l| {
559 usize::cast_from(l) + *offset < persist_fast_path_limit
560 });
561 order_ok && limit_ok
562 }
563 };
564
565 let key_constraint = if let Some(literal) = &literal_constraint {
566 let prefix_len = literal.iter().count();
567 relation_typ
568 .keys
569 .iter()
570 .any(|k| k.iter().all(|idx| *idx < prefix_len))
571 } else {
572 false
573 };
574
575 if key_constraint || (filters.is_empty() && finish_ok) {
579 return Ok(Some(FastPathPlan::PeekPersist(
580 *get_id,
581 literal_constraint,
582 safe_mfp,
583 )));
584 }
585 }
586 MirRelationExpr::Join { implementation, .. } => {
587 if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
588 implementation
589 {
590 return Ok(Some(FastPathPlan::PeekExisting(
591 *coll_id,
592 *idx_id,
593 Some(vals.clone()),
594 permute_oneshot_mfp_around_index(mfp, key)?,
595 )));
596 }
597 }
598 _ => {}
600 }
601 }
602 }
603 Ok(None)
604}
605
606impl FastPathPlan {
607 pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
608 match self {
609 FastPathPlan::Constant(..) => UsedIndexes::default(),
610 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
611 if literal_constraints.is_some() {
612 UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
613 } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
614 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
615 } else {
616 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
617 }
618 }
619 FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
620 }
621 }
622}
623
624impl crate::coord::Coordinator {
625 #[mz_ore::instrument(level = "debug")]
627 pub async fn implement_peek_plan(
628 &mut self,
629 ctx_extra: &mut ExecuteContextExtra,
630 plan: PlannedPeek,
631 finishing: RowSetFinishing,
632 compute_instance: ComputeInstanceId,
633 target_replica: Option<ReplicaId>,
634 max_result_size: u64,
635 max_returned_query_size: Option<u64>,
636 ) -> Result<crate::ExecuteResponse, AdapterError> {
637 let PlannedPeek {
638 plan: fast_path,
639 determination,
640 conn_id,
641 intermediate_result_type,
642 source_arity,
643 source_ids,
644 } = plan;
645
646 if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
648 let mut rows = match rows {
649 Ok(rows) => rows,
650 Err(e) => return Err(e.into()),
651 };
652 consolidate(&mut rows);
654
655 let mut results = Vec::new();
656 for (row, count) in rows {
657 if count.is_negative() {
658 Err(EvalError::InvalidParameterValue(
659 format!("Negative multiplicity in constant result: {}", count).into(),
660 ))?
661 };
662 if count.is_positive() {
663 let count = usize::cast_from(
664 u64::try_from(count.into_inner())
665 .expect("known to be positive from check above"),
666 );
667 results.push((
668 row,
669 NonZeroUsize::new(count).expect("known to be non-zero from check above"),
670 ));
671 }
672 }
673 let row_collection = RowCollection::new(results, &finishing.order_by);
674 let duration_histogram = self.metrics.row_set_finishing_seconds();
675
676 let (ret, reason) = match finishing.finish(
677 row_collection,
678 max_result_size,
679 max_returned_query_size,
680 &duration_histogram,
681 ) {
682 Ok((rows, row_size_bytes)) => {
683 let result_size = u64::cast_from(row_size_bytes);
684 let rows_returned = u64::cast_from(rows.count());
685 (
686 Ok(Self::send_immediate_rows(rows)),
687 StatementEndedExecutionReason::Success {
688 result_size: Some(result_size),
689 rows_returned: Some(rows_returned),
690 execution_strategy: Some(StatementExecutionStrategy::Constant),
691 },
692 )
693 }
694 Err(error) => (
695 Err(AdapterError::ResultSize(error.clone())),
696 StatementEndedExecutionReason::Errored { error },
697 ),
698 };
699 self.retire_execution(reason, std::mem::take(ctx_extra));
700 return ret;
701 }
702
703 let timestamp = determination.timestamp_context.timestamp_or_default();
704 if let Some(id) = ctx_extra.contents() {
705 self.set_statement_execution_timestamp(id, timestamp)
706 }
707
708 let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
715 PeekPlan::FastPath(FastPathPlan::PeekExisting(
716 _coll_id,
717 idx_id,
718 literal_constraints,
719 map_filter_project,
720 )) => (
721 (literal_constraints, timestamp, map_filter_project),
722 None,
723 true,
724 PeekTarget::Index { id: idx_id },
725 StatementExecutionStrategy::FastPath,
726 ),
727 PeekPlan::FastPath(FastPathPlan::PeekPersist(
728 coll_id,
729 literal_constraint,
730 map_filter_project,
731 )) => {
732 let peek_command = (
733 literal_constraint.map(|r| vec![r]),
734 timestamp,
735 map_filter_project,
736 );
737 let metadata = self
738 .controller
739 .storage
740 .collection_metadata(coll_id)
741 .expect("storage collection for fast-path peek")
742 .clone();
743 (
744 peek_command,
745 None,
746 true,
747 PeekTarget::Persist {
748 id: coll_id,
749 metadata,
750 },
751 StatementExecutionStrategy::PersistFastPath,
752 )
753 }
754 PeekPlan::SlowPath(PeekDataflowPlan {
755 desc: dataflow,
756 id: index_id,
760 key: index_key,
761 permutation: index_permutation,
762 thinned_arity: index_thinned_arity,
763 }) => {
764 let output_ids = dataflow.export_ids().collect();
765
766 self.controller
768 .compute
769 .create_dataflow(compute_instance, dataflow, None)
770 .unwrap_or_terminate("cannot fail to create dataflows");
771 self.initialize_compute_read_policies(
772 output_ids,
773 compute_instance,
774 CompactionWindow::DisableCompaction,
776 )
777 .await;
778
779 let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
781 map_filter_project.permute_fn(
782 |c| index_permutation[c],
783 index_key.len() + index_thinned_arity,
784 );
785 let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
786
787 (
788 (None, timestamp, map_filter_project),
789 Some(index_id),
790 false,
791 PeekTarget::Index { id: index_id },
792 StatementExecutionStrategy::Standard,
793 )
794 }
795 _ => {
796 unreachable!()
797 }
798 };
799
800 let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
802
803 let mut uuid = Uuid::new_v4();
806 while self.pending_peeks.contains_key(&uuid) {
807 uuid = Uuid::new_v4();
808 }
809
810 self.pending_peeks.insert(
813 uuid,
814 PendingPeek {
815 conn_id: conn_id.clone(),
816 cluster_id: compute_instance,
817 depends_on: source_ids,
818 ctx_extra: std::mem::take(ctx_extra),
819 is_fast_path,
820 },
821 );
822 self.client_pending_peeks
823 .entry(conn_id)
824 .or_default()
825 .insert(uuid, compute_instance);
826 let (literal_constraints, timestamp, map_filter_project) = peek_command;
827
828 let peek_result_column_names =
831 (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
832 let peek_result_desc =
833 RelationDesc::new(intermediate_result_type, peek_result_column_names);
834
835 self.controller
836 .compute
837 .peek(
838 compute_instance,
839 peek_target,
840 literal_constraints,
841 uuid,
842 timestamp,
843 peek_result_desc,
844 finishing.clone(),
845 map_filter_project,
846 target_replica,
847 rows_tx,
848 )
849 .unwrap_or_terminate("cannot fail to peek");
850
851 let duration_histogram = self.metrics.row_set_finishing_seconds();
852
853 if let Some(index_id) = drop_dataflow {
855 self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
856 self.drop_indexes(vec![(compute_instance, index_id)]);
857 }
858
859 let persist_client = self.persist_client.clone();
860 let peek_stash_read_batch_size_bytes =
861 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
862 .get(self.catalog().system_config().dyncfgs());
863 let peek_stash_read_memory_budget_bytes =
864 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
865 .get(self.catalog().system_config().dyncfgs());
866
867 let peek_response_stream = Self::create_peek_response_stream(
868 rows_rx,
869 finishing,
870 max_result_size,
871 max_returned_query_size,
872 duration_histogram,
873 persist_client,
874 peek_stash_read_batch_size_bytes,
875 peek_stash_read_memory_budget_bytes,
876 );
877
878 Ok(crate::ExecuteResponse::SendingRowsStreaming {
879 rows: Box::pin(peek_response_stream),
880 instance_id: compute_instance,
881 strategy,
882 })
883 }
884
885 #[mz_ore::instrument(level = "debug")]
887 fn create_peek_response_stream(
888 rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
889 finishing: RowSetFinishing,
890 max_result_size: u64,
891 max_returned_query_size: Option<u64>,
892 duration_histogram: prometheus::Histogram,
893 mut persist_client: mz_persist_client::PersistClient,
894 peek_stash_read_batch_size_bytes: usize,
895 peek_stash_read_memory_budget_bytes: usize,
896 ) -> impl futures::Stream<Item = PeekResponseUnary> {
897 async_stream::stream!({
898 let result = rows_rx.await;
899
900 let rows = match result {
901 Ok(rows) => rows,
902 Err(e) => {
903 yield PeekResponseUnary::Error(e.to_string());
904 return;
905 }
906 };
907
908 match rows {
909 PeekResponse::Rows(rows) => {
910 match finishing.finish(
911 rows,
912 max_result_size,
913 max_returned_query_size,
914 &duration_histogram,
915 ) {
916 Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
917 Err(e) => yield PeekResponseUnary::Error(e),
918 }
919 }
920 PeekResponse::Stashed(response) => {
921 let response = *response;
922
923 let shard_id = response.shard_id;
924
925 let mut batches = Vec::new();
926 for proto_batch in response.batches.into_iter() {
927 let batch =
928 persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
929
930 batches.push(batch);
931 }
932 tracing::trace!(?batches, "stashed peek response");
933
934 let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
935 let read_schemas: Schemas<SourceData, ()> = Schemas {
936 id: None,
937 key: Arc::new(response.relation_desc.clone()),
938 val: Arc::new(UnitSchema),
939 };
940
941 let mut row_cursor = persist_client
942 .read_batches_consolidated::<_, _, _, i64>(
943 response.shard_id,
944 as_of,
945 read_schemas,
946 batches,
947 |_stats| true,
948 peek_stash_read_memory_budget_bytes,
949 )
950 .await
951 .expect("invalid usage");
952
953 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
972 mz_ore::task::spawn(|| "read_peek_batches", async move {
973 let result = tx.send(response.inline_rows).await;
983 if result.is_err() {
984 tracing::error!("receiver went away");
985 }
986
987 let mut current_batch = Vec::new();
988 let mut current_batch_size: usize = 0;
989
990 'outer: while let Some(rows) = row_cursor.next().await {
991 for ((key, _val), _ts, diff) in rows {
992 let source_data = key.expect("decoding error");
993
994 let row = source_data
995 .0
996 .expect("we are not sending errors on this code path");
997
998 let diff = usize::try_from(diff)
999 .expect("peek responses cannot have negative diffs");
1000
1001 if diff > 0 {
1002 let diff =
1003 NonZeroUsize::new(diff).expect("checked to be non-zero");
1004 current_batch_size =
1005 current_batch_size.saturating_add(row.byte_len());
1006 current_batch.push((row, diff));
1007 }
1008
1009 if current_batch_size > peek_stash_read_batch_size_bytes {
1010 let result = tx
1016 .send(RowCollection::new(
1017 current_batch.drain(..).collect_vec(),
1018 &[],
1019 ))
1020 .await;
1021 if result.is_err() {
1022 tracing::error!("receiver went away");
1023 break 'outer;
1026 }
1027
1028 current_batch_size = 0;
1029 }
1030 }
1031 }
1032
1033 if current_batch.len() > 0 {
1034 let result = tx.send(RowCollection::new(current_batch, &[])).await;
1035 if result.is_err() {
1036 tracing::error!("receiver went away");
1037 }
1038 }
1039
1040 let batches = row_cursor.into_lease();
1041 tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1042 for batch in batches {
1043 batch.delete().await;
1044 }
1045 });
1046
1047 assert!(
1048 finishing.is_streamable(response.relation_desc.arity()),
1049 "can only get stashed responses when the finishing is streamable"
1050 );
1051
1052 tracing::trace!("query result is streamable!");
1053
1054 assert!(finishing.is_streamable(response.relation_desc.arity()));
1055 let mut incremental_finishing = RowSetFinishingIncremental::new(
1056 finishing.offset,
1057 finishing.limit,
1058 finishing.project,
1059 max_returned_query_size,
1060 );
1061
1062 let mut got_zero_rows = true;
1063 while let Some(rows) = rx.recv().await {
1064 got_zero_rows = false;
1065
1066 let result_rows = incremental_finishing.finish_incremental(
1067 rows,
1068 max_result_size,
1069 &duration_histogram,
1070 );
1071
1072 match result_rows {
1073 Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1074 Err(e) => yield PeekResponseUnary::Error(e),
1075 }
1076 }
1077
1078 if got_zero_rows {
1081 let row_iter = vec![].into_row_iter();
1082 yield PeekResponseUnary::Rows(Box::new(row_iter));
1083 }
1084 }
1085 PeekResponse::Canceled => {
1086 yield PeekResponseUnary::Canceled;
1087 }
1088 PeekResponse::Error(e) => {
1089 yield PeekResponseUnary::Error(e);
1090 }
1091 }
1092 })
1093 }
1094
1095 #[mz_ore::instrument(level = "debug")]
1097 pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1098 if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1099 self.metrics
1100 .canceled_peeks
1101 .with_label_values(&[])
1102 .inc_by(u64::cast_from(uuids.len()));
1103
1104 let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1105 for (uuid, compute_instance) in &uuids {
1106 inverse.entry(*compute_instance).or_default().insert(*uuid);
1107 }
1108 for (compute_instance, uuids) in inverse {
1109 for uuid in uuids {
1114 let _ = self.controller.compute.cancel_peek(
1115 compute_instance,
1116 uuid,
1117 PeekResponse::Canceled,
1118 );
1119 }
1120 }
1121
1122 let peeks = uuids
1123 .iter()
1124 .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1125 .collect::<Vec<_>>();
1126 for peek in peeks {
1127 self.retire_execution(StatementEndedExecutionReason::Canceled, peek.ctx_extra);
1128 }
1129 }
1130 }
1131
1132 pub(crate) fn handle_peek_notification(
1135 &mut self,
1136 uuid: Uuid,
1137 notification: PeekNotification,
1138 otel_ctx: OpenTelemetryContext,
1139 ) {
1140 if let Some(PendingPeek {
1143 conn_id: _,
1144 cluster_id: _,
1145 depends_on: _,
1146 ctx_extra,
1147 is_fast_path,
1148 }) = self.remove_pending_peek(&uuid)
1149 {
1150 let reason = match notification {
1151 PeekNotification::Success {
1152 rows: num_rows,
1153 result_size,
1154 } => {
1155 let strategy = if is_fast_path {
1156 StatementExecutionStrategy::FastPath
1157 } else {
1158 StatementExecutionStrategy::Standard
1159 };
1160 StatementEndedExecutionReason::Success {
1161 result_size: Some(result_size),
1162 rows_returned: Some(num_rows),
1163 execution_strategy: Some(strategy),
1164 }
1165 }
1166 PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1167 PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1168 };
1169 otel_ctx.attach_as_parent();
1170 self.retire_execution(reason, ctx_extra);
1171 }
1172 }
1175
1176 pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1178 let pending_peek = self.pending_peeks.remove(uuid);
1179 if let Some(pending_peek) = &pending_peek {
1180 let uuids = self
1181 .client_pending_peeks
1182 .get_mut(&pending_peek.conn_id)
1183 .expect("coord peek state is inconsistent");
1184 uuids.remove(uuid);
1185 if uuids.is_empty() {
1186 self.client_pending_peeks.remove(&pending_peek.conn_id);
1187 }
1188 }
1189 pending_peek
1190 }
1191
1192 pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1196 where
1197 I: IntoRowIterator,
1198 I::Iter: Send + Sync + 'static,
1199 {
1200 let rows = Box::new(rows.into_row_iter());
1201 ExecuteResponse::SendingRowsImmediate { rows }
1202 }
1203}
1204
1205#[cfg(test)]
1206mod tests {
1207 use mz_expr::func::IsNull;
1208 use mz_expr::{MapFilterProject, UnaryFunc};
1209 use mz_ore::str::Indent;
1210 use mz_repr::explain::text::text_string_at;
1211 use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1212 use mz_repr::{ColumnType, Datum, ScalarType};
1213
1214 use super::*;
1215
1216 #[mz_ore::test]
1217 #[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
1219 let typ = RelationType::new(vec![ColumnType {
1220 scalar_type: ScalarType::String,
1221 nullable: false,
1222 }]);
1223 let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1224 let no_lookup = FastPathPlan::PeekExisting(
1225 GlobalId::User(8),
1226 GlobalId::User(10),
1227 None,
1228 MapFilterProject::new(4)
1229 .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1230 .project([1, 4])
1231 .into_plan()
1232 .expect("invalid plan")
1233 .into_nontemporal()
1234 .expect("invalid nontemporal"),
1235 );
1236 let lookup = FastPathPlan::PeekExisting(
1237 GlobalId::User(9),
1238 GlobalId::User(11),
1239 Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1240 MapFilterProject::new(3)
1241 .filter(Some(
1242 MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1243 ))
1244 .into_plan()
1245 .expect("invalid plan")
1246 .into_nontemporal()
1247 .expect("invalid nontemporal"),
1248 );
1249
1250 let humanizer = DummyHumanizer;
1251 let config = ExplainConfig {
1252 redacted: false,
1253 verbose_syntax: true,
1254 ..Default::default()
1255 };
1256 let ctx_gen = || {
1257 let indent = Indent::default();
1258 let annotations = BTreeMap::new();
1259 PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
1260 };
1261
1262 let constant_err_exp = "Error \"division by zero\"\n";
1263 let no_lookup_exp = "Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1264 let lookup_exp =
1265 "Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1266
1267 assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1268 assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1269 assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1270
1271 let mut constant_rows = vec![
1272 (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1273 (Row::pack(Some(Datum::String("world"))), 2.into()),
1274 (Row::pack(Some(Datum::String("star"))), 500.into()),
1275 ];
1276 let constant_exp1 =
1277 "Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
1278 assert_eq!(
1279 text_string_at(
1280 &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1281 ctx_gen
1282 ),
1283 constant_exp1
1284 );
1285 constant_rows
1286 .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1287 let constant_exp2 = "Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
1288 \n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
1289 \n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
1290 \n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
1291 \n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
1292 assert_eq!(
1293 text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1294 constant_exp2
1295 );
1296 }
1297}