1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35 EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36 RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::task;
41use mz_ore::tracing::OpenTelemetryContext;
42use mz_persist_client::Schemas;
43use mz_persist_types::codec_impls::UnitSchema;
44use mz_repr::explain::text::DisplayText;
45use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
46use mz_repr::{
47 Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
48 preserves_order,
49};
50use mz_storage_types::sources::SourceData;
51use serde::{Deserialize, Serialize};
52use timely::progress::{Antichain, Timestamp};
53use tokio::sync::oneshot;
54use tracing::{Instrument, Span};
55use uuid::Uuid;
56
57use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
58use crate::coord::timestamp_selection::TimestampDetermination;
59use crate::optimize::OptimizerError;
60use crate::statement_logging::WatchSetCreation;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
62use crate::util::ResultExt;
63use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse};
64
65#[derive(Debug)]
67pub(crate) struct PendingPeek {
68 pub(crate) conn_id: ConnectionId,
70 pub(crate) cluster_id: ClusterId,
72 pub(crate) depends_on: BTreeSet<GlobalId>,
74 pub(crate) ctx_extra: ExecuteContextGuard,
77 pub(crate) is_fast_path: bool,
79}
80
81#[derive(Debug)]
86pub enum PeekResponseUnary {
87 Rows(Box<dyn RowIterator + Send + Sync>),
88 Error(String),
89 Canceled,
90}
91
92#[derive(Clone, Debug)]
93pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
94 pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
95 pub(crate) id: GlobalId,
96 key: Vec<MirScalarExpr>,
97 permutation: Vec<usize>,
98 thinned_arity: usize,
99}
100
101impl<T> PeekDataflowPlan<T> {
102 pub fn new(
103 desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
104 id: GlobalId,
105 typ: &SqlRelationType,
106 ) -> Self {
107 let arity = typ.arity();
108 let key = typ
109 .default_key()
110 .into_iter()
111 .map(MirScalarExpr::column)
112 .collect::<Vec<_>>();
113 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
114 Self {
115 desc,
116 id,
117 key,
118 permutation,
119 thinned_arity: thinning.len(),
120 }
121 }
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
125pub enum FastPathPlan {
126 Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
131 PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
134 PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
136}
137
138impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
139 fn fmt_text(
140 &self,
141 f: &mut fmt::Formatter<'_>,
142 ctx: &mut PlanRenderingContext<'a, T>,
143 ) -> fmt::Result {
144 if ctx.config.verbose_syntax {
145 self.fmt_verbose_text(f, ctx)
146 } else {
147 self.fmt_default_text(f, ctx)
148 }
149 }
150}
151
152impl FastPathPlan {
153 pub fn fmt_default_text<'a, T>(
154 &self,
155 f: &mut fmt::Formatter<'_>,
156 ctx: &mut PlanRenderingContext<'a, T>,
157 ) -> fmt::Result {
158 let mode = HumanizedExplain::new(ctx.config.redacted);
159
160 match self {
161 FastPathPlan::Constant(rows, _) => {
162 write!(f, "{}→Constant ", ctx.indent)?;
163
164 match rows {
165 Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
166 Err(err) => {
167 if mode.redacted() {
168 writeln!(f, "(error: █)")?;
169 } else {
170 writeln!(f, "(error: {})", err.to_string().quoted(),)?;
171 }
172 }
173 }
174 }
175 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
176 let coll = ctx
177 .humanizer
178 .humanize_id(*coll_id)
179 .unwrap_or_else(|| coll_id.to_string());
180 let idx = ctx
181 .humanizer
182 .humanize_id(*idx_id)
183 .unwrap_or_else(|| idx_id.to_string());
184 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
185 ctx.indent.set();
186
187 ctx.indent += 1;
188
189 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
190 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
191
192 if printed {
193 ctx.indent += 1;
194 }
195 if let Some(literal_constraints) = literal_constraints {
196 writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
197 ctx.indent += 1;
198 let values = separated("; ", mode.seq(literal_constraints, None));
199 writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
200 } else {
201 writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
202 }
203
204 ctx.indent.reset();
205 }
206 FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
207 let coll = ctx
208 .humanizer
209 .humanize_id(*global_id)
210 .unwrap_or_else(|| global_id.to_string());
211 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
212 ctx.indent.set();
213
214 ctx.indent += 1;
215
216 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
217 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
218
219 if printed {
220 ctx.indent += 1;
221 }
222 if let Some(literal_constraint) = literal_constraint {
223 writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
224 ctx.indent += 1;
225 let value = mode.expr(literal_constraint, None);
226 writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
227 } else {
228 writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
229 }
230
231 ctx.indent.reset();
232 }
233 }
234
235 Ok(())
236 }
237
238 pub fn fmt_verbose_text<'a, T>(
239 &self,
240 f: &mut fmt::Formatter<'_>,
241 ctx: &mut PlanRenderingContext<'a, T>,
242 ) -> fmt::Result {
243 let redacted = ctx.config.redacted;
244 let mode = HumanizedExplain::new(redacted);
245
246 match self {
249 FastPathPlan::Constant(Ok(rows), _) => {
250 if !rows.is_empty() {
251 writeln!(f, "{}Constant", ctx.indent)?;
252 *ctx.as_mut() += 1;
253 fmt_text_constant_rows(
254 f,
255 rows.iter().map(|(row, diff)| (row, diff)),
256 ctx.as_mut(),
257 redacted,
258 )?;
259 *ctx.as_mut() -= 1;
260 } else {
261 writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
262 }
263 Ok(())
264 }
265 FastPathPlan::Constant(Err(err), _) => {
266 if redacted {
267 writeln!(f, "{}Error █", ctx.as_mut())
268 } else {
269 writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
270 }
271 }
272 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
273 ctx.as_mut().set();
274 let (map, filter, project) = mfp.as_map_filter_project();
275
276 let cols = if !ctx.config.humanized_exprs {
277 None
278 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
279 let cols = itertools::chain(
283 cols.iter().cloned(),
284 std::iter::repeat(String::new()).take(map.len()),
285 )
286 .collect();
287 Some(cols)
288 } else {
289 None
290 };
291
292 if project.len() != mfp.input_arity + map.len()
293 || !project.iter().enumerate().all(|(i, o)| i == *o)
294 {
295 let outputs = mode.seq(&project, cols.as_ref());
296 let outputs = CompactScalars(outputs);
297 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
298 *ctx.as_mut() += 1;
299 }
300 if !filter.is_empty() {
301 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
302 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
303 *ctx.as_mut() += 1;
304 }
305 if !map.is_empty() {
306 let scalars = mode.seq(&map, cols.as_ref());
307 let scalars = CompactScalars(scalars);
308 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
309 *ctx.as_mut() += 1;
310 }
311 MirRelationExpr::fmt_indexed_filter(
312 f,
313 ctx,
314 coll_id,
315 idx_id,
316 literal_constraints.clone(),
317 None,
318 )?;
319 writeln!(f)?;
320 ctx.as_mut().reset();
321 Ok(())
322 }
323 FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
324 ctx.as_mut().set();
325 let (map, filter, project) = mfp.as_map_filter_project();
326
327 let cols = if !ctx.config.humanized_exprs {
328 None
329 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
330 let cols = itertools::chain(
331 cols.iter().cloned(),
332 std::iter::repeat(String::new()).take(map.len()),
333 )
334 .collect::<Vec<_>>();
335 Some(cols)
336 } else {
337 None
338 };
339
340 if project.len() != mfp.input_arity + map.len()
341 || !project.iter().enumerate().all(|(i, o)| i == *o)
342 {
343 let outputs = mode.seq(&project, cols.as_ref());
344 let outputs = CompactScalars(outputs);
345 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
346 *ctx.as_mut() += 1;
347 }
348 if !filter.is_empty() {
349 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
350 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
351 *ctx.as_mut() += 1;
352 }
353 if !map.is_empty() {
354 let scalars = mode.seq(&map, cols.as_ref());
355 let scalars = CompactScalars(scalars);
356 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
357 *ctx.as_mut() += 1;
358 }
359 let human_id = ctx
360 .humanizer
361 .humanize_id(*gid)
362 .unwrap_or_else(|| gid.to_string());
363 write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
364 if let Some(literal) = literal_constraint {
365 let value = mode.expr(literal, None);
366 writeln!(f, " [value={}]", value)?;
367 } else {
368 writeln!(f, "")?;
369 }
370 ctx.as_mut().reset();
371 Ok(())
372 }
373 }?;
374 Ok(())
375 }
376}
377
378#[derive(Debug)]
379pub struct PlannedPeek {
380 pub plan: PeekPlan,
381 pub determination: TimestampDetermination<mz_repr::Timestamp>,
382 pub conn_id: ConnectionId,
383 pub intermediate_result_type: SqlRelationType,
390 pub source_arity: usize,
391 pub source_ids: BTreeSet<GlobalId>,
392}
393
394#[derive(Clone, Debug)]
396pub enum PeekPlan<T = mz_repr::Timestamp> {
397 FastPath(FastPathPlan),
398 SlowPath(PeekDataflowPlan<T>),
400}
401
402fn mfp_to_safe_plan(
407 mfp: mz_expr::MapFilterProject,
408) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
409 mfp.into_plan()
410 .map_err(OptimizerError::InternalUnsafeMfpPlan)?
411 .into_nontemporal()
412 .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
413}
414
415fn permute_oneshot_mfp_around_index(
417 mfp: mz_expr::MapFilterProject,
418 key: &[MirScalarExpr],
419) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
420 let input_arity = mfp.input_arity;
421 let mut safe_mfp = mfp_to_safe_plan(mfp)?;
422 let (permute, thinning) = permutation_for_arrangement(key, input_arity);
423 safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
424 Ok(safe_mfp)
425}
426
427pub fn create_fast_path_plan<T: Timestamp>(
433 dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
434 view_id: GlobalId,
435 finishing: Option<&RowSetFinishing>,
436 persist_fast_path_limit: usize,
437 persist_fast_path_order: bool,
438) -> Result<Option<FastPathPlan>, OptimizerError> {
439 if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
445 {
446 let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
447 if let Some((rows, found_typ)) = mir.as_const() {
448 let plan = FastPathPlan::Constant(rows.clone(), found_typ.clone());
450 return Ok(Some(plan));
451 } else {
452 if let MirRelationExpr::TopK {
455 input,
456 group_key,
457 order_key,
458 limit,
459 offset,
460 monotonic: _,
461 expected_group_size: _,
462 } = mir
463 {
464 if let Some(finishing) = finishing {
465 if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
466 let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
468 (None, _) => true,
469 (Some(..), None) => false,
470 (Some(topk_limit), Some(finishing_limit)) => {
471 if let Some(l) = topk_limit.as_literal_int64() {
472 l >= *finishing_limit
473 } else {
474 false
475 }
476 }
477 };
478 if finishing_limits_at_least_as_topk {
479 mir = input;
480 }
481 }
482 }
483 }
484 let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
488 match mir {
489 MirRelationExpr::Get {
490 id: Id::Global(get_id),
491 typ: relation_typ,
492 ..
493 } => {
494 for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
496 if desc.on_id == *get_id {
497 return Ok(Some(FastPathPlan::PeekExisting(
498 *get_id,
499 *index_id,
500 None,
501 permute_oneshot_mfp_around_index(mfp, &desc.key)?,
502 )));
503 }
504 }
505
506 let safe_mfp = mfp_to_safe_plan(mfp)?;
510 let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
511
512 let literal_constraint = if persist_fast_path_order {
513 let mut row = Row::default();
514 let mut packer = row.packer();
515 for (idx, col) in relation_typ.column_types.iter().enumerate() {
516 if !preserves_order(&col.scalar_type) {
517 break;
518 }
519 let col_expr = MirScalarExpr::column(idx);
520
521 let Some((literal, _)) = filters
522 .iter()
523 .filter_map(|f| f.expr_eq_literal(&col_expr))
524 .next()
525 else {
526 break;
527 };
528 packer.extend_by_row(&literal);
529 }
530 if row.is_empty() { None } else { Some(row) }
531 } else {
532 None
533 };
534
535 let finish_ok = match &finishing {
536 None => false,
537 Some(RowSetFinishing {
538 order_by,
539 limit,
540 offset,
541 ..
542 }) => {
543 let order_ok = if persist_fast_path_order {
544 order_by.iter().enumerate().all(|(idx, order)| {
545 let column_idx = projection[order.column];
548 if column_idx >= safe_mfp.input_arity {
549 return false;
550 }
551 let column_type = &relation_typ.column_types[column_idx];
552 let index_ok = idx == column_idx;
553 let nulls_ok = !column_type.nullable || order.nulls_last;
554 let asc_ok = !order.desc;
555 let type_ok = preserves_order(&column_type.scalar_type);
556 index_ok && nulls_ok && asc_ok && type_ok
557 })
558 } else {
559 order_by.is_empty()
560 };
561 let limit_ok = limit.map_or(false, |l| {
562 usize::cast_from(l) + *offset < persist_fast_path_limit
563 });
564 order_ok && limit_ok
565 }
566 };
567
568 let key_constraint = if let Some(literal) = &literal_constraint {
569 let prefix_len = literal.iter().count();
570 relation_typ
571 .keys
572 .iter()
573 .any(|k| k.iter().all(|idx| *idx < prefix_len))
574 } else {
575 false
576 };
577
578 if key_constraint || (filters.is_empty() && finish_ok) {
582 return Ok(Some(FastPathPlan::PeekPersist(
583 *get_id,
584 literal_constraint,
585 safe_mfp,
586 )));
587 }
588 }
589 MirRelationExpr::Join { implementation, .. } => {
590 if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
591 implementation
592 {
593 return Ok(Some(FastPathPlan::PeekExisting(
594 *coll_id,
595 *idx_id,
596 Some(vals.clone()),
597 permute_oneshot_mfp_around_index(mfp, key)?,
598 )));
599 }
600 }
601 _ => {}
603 }
604 }
605 }
606 Ok(None)
607}
608
609impl FastPathPlan {
610 pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
611 match self {
612 FastPathPlan::Constant(..) => UsedIndexes::default(),
613 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
614 if literal_constraints.is_some() {
615 UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
616 } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
617 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
618 } else {
619 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
620 }
621 }
622 FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
623 }
624 }
625}
626
627impl crate::coord::Coordinator {
628 #[mz_ore::instrument(level = "debug")]
630 pub async fn implement_peek_plan(
631 &mut self,
632 ctx_extra: &mut ExecuteContextGuard,
633 plan: PlannedPeek,
634 finishing: RowSetFinishing,
635 compute_instance: ComputeInstanceId,
636 target_replica: Option<ReplicaId>,
637 max_result_size: u64,
638 max_returned_query_size: Option<u64>,
639 ) -> Result<ExecuteResponse, AdapterError> {
640 let PlannedPeek {
641 plan: fast_path,
642 determination,
643 conn_id,
644 intermediate_result_type,
645 source_arity,
646 source_ids,
647 } = plan;
648
649 if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
651 let mut rows = match rows {
652 Ok(rows) => rows,
653 Err(e) => return Err(e.into()),
654 };
655 consolidate(&mut rows);
657
658 let mut results = Vec::new();
659 for (row, count) in rows {
660 if count.is_negative() {
661 Err(EvalError::InvalidParameterValue(
662 format!("Negative multiplicity in constant result: {}", count).into(),
663 ))?
664 };
665 if count.is_positive() {
666 let count = usize::cast_from(
667 u64::try_from(count.into_inner())
668 .expect("known to be positive from check above"),
669 );
670 results.push((
671 row,
672 NonZeroUsize::new(count).expect("known to be non-zero from check above"),
673 ));
674 }
675 }
676 let row_collection = RowCollection::new(results, &finishing.order_by);
677 let duration_histogram = self.metrics.row_set_finishing_seconds();
678
679 let (ret, reason) = match finishing.finish(
680 row_collection,
681 max_result_size,
682 max_returned_query_size,
683 &duration_histogram,
684 ) {
685 Ok((rows, row_size_bytes)) => {
686 let result_size = u64::cast_from(row_size_bytes);
687 let rows_returned = u64::cast_from(rows.count());
688 (
689 Ok(Self::send_immediate_rows(rows)),
690 StatementEndedExecutionReason::Success {
691 result_size: Some(result_size),
692 rows_returned: Some(rows_returned),
693 execution_strategy: Some(StatementExecutionStrategy::Constant),
694 },
695 )
696 }
697 Err(error) => (
698 Err(AdapterError::ResultSize(error.clone())),
699 StatementEndedExecutionReason::Errored { error },
700 ),
701 };
702 self.retire_execution(reason, std::mem::take(ctx_extra).defuse());
703 return ret;
704 }
705
706 let timestamp = determination.timestamp_context.timestamp_or_default();
707 if let Some(id) = ctx_extra.contents() {
708 self.set_statement_execution_timestamp(id, timestamp)
709 }
710
711 let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
718 PeekPlan::FastPath(FastPathPlan::PeekExisting(
719 _coll_id,
720 idx_id,
721 literal_constraints,
722 map_filter_project,
723 )) => (
724 (literal_constraints, timestamp, map_filter_project),
725 None,
726 true,
727 PeekTarget::Index { id: idx_id },
728 StatementExecutionStrategy::FastPath,
729 ),
730 PeekPlan::FastPath(FastPathPlan::PeekPersist(
731 coll_id,
732 literal_constraint,
733 map_filter_project,
734 )) => {
735 let peek_command = (
736 literal_constraint.map(|r| vec![r]),
737 timestamp,
738 map_filter_project,
739 );
740 let metadata = self
741 .controller
742 .storage
743 .collection_metadata(coll_id)
744 .expect("storage collection for fast-path peek")
745 .clone();
746 (
747 peek_command,
748 None,
749 true,
750 PeekTarget::Persist {
751 id: coll_id,
752 metadata,
753 },
754 StatementExecutionStrategy::PersistFastPath,
755 )
756 }
757 PeekPlan::SlowPath(PeekDataflowPlan {
758 desc: dataflow,
759 id: index_id,
763 key: index_key,
764 permutation: index_permutation,
765 thinned_arity: index_thinned_arity,
766 }) => {
767 let output_ids = dataflow.export_ids().collect();
768
769 self.controller
771 .compute
772 .create_dataflow(compute_instance, dataflow, None)
773 .map_err(
774 AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
775 )?;
776 self.initialize_compute_read_policies(
777 output_ids,
778 compute_instance,
779 CompactionWindow::DisableCompaction,
781 )
782 .await;
783
784 let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
786 map_filter_project.permute_fn(
787 |c| index_permutation[c],
788 index_key.len() + index_thinned_arity,
789 );
790 let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
791
792 (
793 (None, timestamp, map_filter_project),
794 Some(index_id),
795 false,
796 PeekTarget::Index { id: index_id },
797 StatementExecutionStrategy::Standard,
798 )
799 }
800 _ => {
801 unreachable!()
802 }
803 };
804
805 let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
807
808 let mut uuid = Uuid::new_v4();
811 while self.pending_peeks.contains_key(&uuid) {
812 uuid = Uuid::new_v4();
813 }
814
815 self.pending_peeks.insert(
818 uuid,
819 PendingPeek {
820 conn_id: conn_id.clone(),
821 cluster_id: compute_instance,
822 depends_on: source_ids,
823 ctx_extra: std::mem::take(ctx_extra),
824 is_fast_path,
825 },
826 );
827 self.client_pending_peeks
828 .entry(conn_id)
829 .or_default()
830 .insert(uuid, compute_instance);
831 let (literal_constraints, timestamp, map_filter_project) = peek_command;
832
833 let peek_result_column_names =
836 (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
837 let peek_result_desc =
838 RelationDesc::new(intermediate_result_type, peek_result_column_names);
839
840 self.controller
841 .compute
842 .peek(
843 compute_instance,
844 peek_target,
845 literal_constraints,
846 uuid,
847 timestamp,
848 peek_result_desc,
849 finishing.clone(),
850 map_filter_project,
851 target_replica,
852 rows_tx,
853 )
854 .unwrap_or_terminate("cannot fail to peek");
855
856 let duration_histogram = self.metrics.row_set_finishing_seconds();
857
858 if let Some(index_id) = drop_dataflow {
860 self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
861 self.drop_compute_collections(vec![(compute_instance, index_id)]);
862 }
863
864 let persist_client = self.persist_client.clone();
865 let peek_stash_read_batch_size_bytes =
866 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
867 .get(self.catalog().system_config().dyncfgs());
868 let peek_stash_read_memory_budget_bytes =
869 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
870 .get(self.catalog().system_config().dyncfgs());
871
872 let peek_response_stream = Self::create_peek_response_stream(
873 rows_rx,
874 finishing,
875 max_result_size,
876 max_returned_query_size,
877 duration_histogram,
878 persist_client,
879 peek_stash_read_batch_size_bytes,
880 peek_stash_read_memory_budget_bytes,
881 );
882
883 Ok(crate::ExecuteResponse::SendingRowsStreaming {
884 rows: Box::pin(peek_response_stream),
885 instance_id: compute_instance,
886 strategy,
887 })
888 }
889
890 #[mz_ore::instrument(level = "debug")]
894 pub(crate) fn create_peek_response_stream(
895 rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
896 finishing: RowSetFinishing,
897 max_result_size: u64,
898 max_returned_query_size: Option<u64>,
899 duration_histogram: prometheus::Histogram,
900 mut persist_client: mz_persist_client::PersistClient,
901 peek_stash_read_batch_size_bytes: usize,
902 peek_stash_read_memory_budget_bytes: usize,
903 ) -> impl futures::Stream<Item = PeekResponseUnary> {
904 async_stream::stream!({
905 let result = rows_rx.await;
906
907 let rows = match result {
908 Ok(rows) => rows,
909 Err(e) => {
910 yield PeekResponseUnary::Error(e.to_string());
911 return;
912 }
913 };
914
915 match rows {
916 PeekResponse::Rows(rows) => {
917 match finishing.finish(
918 rows,
919 max_result_size,
920 max_returned_query_size,
921 &duration_histogram,
922 ) {
923 Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
924 Err(e) => yield PeekResponseUnary::Error(e),
925 }
926 }
927 PeekResponse::Stashed(response) => {
928 let response = *response;
929
930 let shard_id = response.shard_id;
931
932 let mut batches = Vec::new();
933 for proto_batch in response.batches.into_iter() {
934 let batch =
935 persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
936
937 batches.push(batch);
938 }
939 tracing::trace!(?batches, "stashed peek response");
940
941 let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
942 let read_schemas: Schemas<SourceData, ()> = Schemas {
943 id: None,
944 key: Arc::new(response.relation_desc.clone()),
945 val: Arc::new(UnitSchema),
946 };
947
948 let mut row_cursor = persist_client
949 .read_batches_consolidated::<_, _, _, i64>(
950 response.shard_id,
951 as_of,
952 read_schemas,
953 batches,
954 |_stats| true,
955 peek_stash_read_memory_budget_bytes,
956 )
957 .await
958 .expect("invalid usage");
959
960 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
979 mz_ore::task::spawn(|| "read_peek_batches", async move {
980 let result = tx.send(response.inline_rows).await;
990 if result.is_err() {
991 tracing::debug!("receiver went away");
992 }
993
994 let mut current_batch = Vec::new();
995 let mut current_batch_size: usize = 0;
996
997 'outer: while let Some(rows) = row_cursor.next().await {
998 for ((source_data, _val), _ts, diff) in rows {
999 let row = source_data
1000 .0
1001 .expect("we are not sending errors on this code path");
1002
1003 let diff = usize::try_from(diff)
1004 .expect("peek responses cannot have negative diffs");
1005
1006 if diff > 0 {
1007 let diff =
1008 NonZeroUsize::new(diff).expect("checked to be non-zero");
1009 current_batch_size =
1010 current_batch_size.saturating_add(row.byte_len());
1011 current_batch.push((row, diff));
1012 }
1013
1014 if current_batch_size > peek_stash_read_batch_size_bytes {
1015 let result = tx
1021 .send(RowCollection::new(
1022 current_batch.drain(..).collect_vec(),
1023 &[],
1024 ))
1025 .await;
1026 if result.is_err() {
1027 tracing::debug!("receiver went away");
1028 break 'outer;
1031 }
1032
1033 current_batch_size = 0;
1034 }
1035 }
1036 }
1037
1038 if current_batch.len() > 0 {
1039 let result = tx.send(RowCollection::new(current_batch, &[])).await;
1040 if result.is_err() {
1041 tracing::debug!("receiver went away");
1042 }
1043 }
1044
1045 let batches = row_cursor.into_lease();
1046 tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1047 for batch in batches {
1048 batch.delete().await;
1049 }
1050 });
1051
1052 assert!(
1053 finishing.is_streamable(response.relation_desc.arity()),
1054 "can only get stashed responses when the finishing is streamable"
1055 );
1056
1057 tracing::trace!("query result is streamable!");
1058
1059 assert!(finishing.is_streamable(response.relation_desc.arity()));
1060 let mut incremental_finishing = RowSetFinishingIncremental::new(
1061 finishing.offset,
1062 finishing.limit,
1063 finishing.project,
1064 max_returned_query_size,
1065 );
1066
1067 let mut got_zero_rows = true;
1068 while let Some(rows) = rx.recv().await {
1069 got_zero_rows = false;
1070
1071 let result_rows = incremental_finishing.finish_incremental(
1072 rows,
1073 max_result_size,
1074 &duration_histogram,
1075 );
1076
1077 match result_rows {
1078 Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1079 Err(e) => yield PeekResponseUnary::Error(e),
1080 }
1081 }
1082
1083 if got_zero_rows {
1086 let row_iter = vec![].into_row_iter();
1087 yield PeekResponseUnary::Rows(Box::new(row_iter));
1088 }
1089 }
1090 PeekResponse::Canceled => {
1091 yield PeekResponseUnary::Canceled;
1092 }
1093 PeekResponse::Error(e) => {
1094 yield PeekResponseUnary::Error(e);
1095 }
1096 }
1097 })
1098 }
1099
1100 #[mz_ore::instrument(level = "debug")]
1102 pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1103 if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1104 self.metrics
1105 .canceled_peeks
1106 .inc_by(u64::cast_from(uuids.len()));
1107
1108 let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1109 for (uuid, compute_instance) in &uuids {
1110 inverse.entry(*compute_instance).or_default().insert(*uuid);
1111 }
1112 for (compute_instance, uuids) in inverse {
1113 for uuid in uuids {
1118 let _ = self.controller.compute.cancel_peek(
1119 compute_instance,
1120 uuid,
1121 PeekResponse::Canceled,
1122 );
1123 }
1124 }
1125
1126 let peeks = uuids
1127 .iter()
1128 .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1129 .collect::<Vec<_>>();
1130 for peek in peeks {
1131 self.retire_execution(
1132 StatementEndedExecutionReason::Canceled,
1133 peek.ctx_extra.defuse(),
1134 );
1135 }
1136 }
1137 }
1138
1139 pub(crate) fn handle_peek_notification(
1142 &mut self,
1143 uuid: Uuid,
1144 notification: PeekNotification,
1145 otel_ctx: OpenTelemetryContext,
1146 ) {
1147 if let Some(PendingPeek {
1150 conn_id: _,
1151 cluster_id: _,
1152 depends_on: _,
1153 ctx_extra,
1154 is_fast_path,
1155 }) = self.remove_pending_peek(&uuid)
1156 {
1157 let reason = match notification {
1158 PeekNotification::Success {
1159 rows: num_rows,
1160 result_size,
1161 } => {
1162 let strategy = if is_fast_path {
1163 StatementExecutionStrategy::FastPath
1164 } else {
1165 StatementExecutionStrategy::Standard
1166 };
1167 StatementEndedExecutionReason::Success {
1168 result_size: Some(result_size),
1169 rows_returned: Some(num_rows),
1170 execution_strategy: Some(strategy),
1171 }
1172 }
1173 PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1174 PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1175 };
1176 otel_ctx.attach_as_parent();
1177 self.retire_execution(reason, ctx_extra.defuse());
1178 }
1179 }
1182
1183 pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1185 let pending_peek = self.pending_peeks.remove(uuid);
1186 if let Some(pending_peek) = &pending_peek {
1187 let uuids = self
1188 .client_pending_peeks
1189 .get_mut(&pending_peek.conn_id)
1190 .expect("coord peek state is inconsistent");
1191 uuids.remove(uuid);
1192 if uuids.is_empty() {
1193 self.client_pending_peeks.remove(&pending_peek.conn_id);
1194 }
1195 }
1196 pending_peek
1197 }
1198
1199 pub(crate) async fn implement_slow_path_peek(
1205 &mut self,
1206 dataflow_plan: PeekDataflowPlan<mz_repr::Timestamp>,
1207 determination: TimestampDetermination<mz_repr::Timestamp>,
1208 finishing: RowSetFinishing,
1209 compute_instance: ComputeInstanceId,
1210 target_replica: Option<ReplicaId>,
1211 intermediate_result_type: SqlRelationType,
1212 source_ids: BTreeSet<GlobalId>,
1213 conn_id: ConnectionId,
1214 max_result_size: u64,
1215 max_query_result_size: Option<u64>,
1216 watch_set: Option<WatchSetCreation>,
1217 ) -> Result<ExecuteResponse, AdapterError> {
1218 let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1223 if let Some(ws) = watch_set {
1224 self.install_peek_watch_sets(conn_id.clone(), ws)
1225 .map_err(|e| {
1226 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1227 e,
1228 compute_instance,
1229 )
1230 })?;
1231 }
1232
1233 let source_arity = intermediate_result_type.arity();
1234
1235 let planned_peek = PlannedPeek {
1236 plan: PeekPlan::SlowPath(dataflow_plan),
1237 determination,
1238 conn_id,
1239 intermediate_result_type,
1240 source_arity,
1241 source_ids,
1242 };
1243
1244 self.implement_peek_plan(
1249 &mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
1250 planned_peek,
1251 finishing,
1252 compute_instance,
1253 target_replica,
1254 max_result_size,
1255 max_query_result_size,
1256 )
1257 .await
1258 }
1259
1260 pub(crate) async fn implement_copy_to(
1273 &mut self,
1274 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1275 compute_instance: ComputeInstanceId,
1276 target_replica: Option<ReplicaId>,
1277 source_ids: BTreeSet<GlobalId>,
1278 conn_id: ConnectionId,
1279 watch_set: Option<WatchSetCreation>,
1280 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1281 ) {
1282 let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1284 e: AdapterError| {
1285 let _ = tx.send(Err(e));
1286 };
1287
1288 if let Some(ws) = watch_set {
1292 if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1293 let err = AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1294 e,
1295 compute_instance,
1296 );
1297 send_err(tx, err);
1298 return;
1299 }
1300 }
1301
1302 let sink_id = df_desc.sink_id();
1306
1307 let (sink_tx, sink_rx) = oneshot::channel();
1311 let active_copy_to = ActiveCopyTo {
1312 conn_id: conn_id.clone(),
1313 tx: sink_tx,
1314 cluster_id: compute_instance,
1315 depends_on: source_ids,
1316 };
1317
1318 drop(
1320 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1321 .await,
1322 );
1323
1324 if let Err(e) = self
1327 .try_ship_dataflow(df_desc, compute_instance, target_replica)
1328 .await
1329 .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1330 {
1331 self.remove_active_compute_sink(sink_id).await;
1335 send_err(tx, e);
1336 return;
1337 }
1338
1339 let span = Span::current();
1344 task::spawn(
1345 || "copy to completion",
1346 async move {
1347 let res = sink_rx.await;
1348 let result = match res {
1349 Ok(res) => res,
1350 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1351 };
1352
1353 let _ = tx.send(result);
1354 }
1355 .instrument(span),
1356 );
1357 }
1358
1359 pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1363 where
1364 I: IntoRowIterator,
1365 I::Iter: Send + Sync + 'static,
1366 {
1367 let rows = Box::new(rows.into_row_iter());
1368 ExecuteResponse::SendingRowsImmediate { rows }
1369 }
1370}
1371
1372#[cfg(test)]
1373mod tests {
1374 use mz_expr::func::IsNull;
1375 use mz_expr::{MapFilterProject, UnaryFunc};
1376 use mz_ore::str::Indent;
1377 use mz_repr::explain::text::text_string_at;
1378 use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1379 use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1380
1381 use super::*;
1382
1383 #[mz_ore::test]
1384 #[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
1386 let typ = SqlRelationType::new(vec![SqlColumnType {
1387 scalar_type: SqlScalarType::String,
1388 nullable: false,
1389 }]);
1390 let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1391 let no_lookup = FastPathPlan::PeekExisting(
1392 GlobalId::User(8),
1393 GlobalId::User(10),
1394 None,
1395 MapFilterProject::new(4)
1396 .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1397 .project([1, 4])
1398 .into_plan()
1399 .expect("invalid plan")
1400 .into_nontemporal()
1401 .expect("invalid nontemporal"),
1402 );
1403 let lookup = FastPathPlan::PeekExisting(
1404 GlobalId::User(9),
1405 GlobalId::User(11),
1406 Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1407 MapFilterProject::new(3)
1408 .filter(Some(
1409 MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1410 ))
1411 .into_plan()
1412 .expect("invalid plan")
1413 .into_nontemporal()
1414 .expect("invalid nontemporal"),
1415 );
1416
1417 let humanizer = DummyHumanizer;
1418 let config = ExplainConfig {
1419 redacted: false,
1420 verbose_syntax: true,
1421 ..Default::default()
1422 };
1423 let ctx_gen = || {
1424 let indent = Indent::default();
1425 let annotations = BTreeMap::new();
1426 PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
1427 };
1428
1429 let constant_err_exp = "Error \"division by zero\"\n";
1430 let no_lookup_exp = "Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1431 let lookup_exp =
1432 "Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1433
1434 assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1435 assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1436 assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1437
1438 let mut constant_rows = vec![
1439 (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1440 (Row::pack(Some(Datum::String("world"))), 2.into()),
1441 (Row::pack(Some(Datum::String("star"))), 500.into()),
1442 ];
1443 let constant_exp1 =
1444 "Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
1445 assert_eq!(
1446 text_string_at(
1447 &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1448 ctx_gen
1449 ),
1450 constant_exp1
1451 );
1452 constant_rows
1453 .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1454 let constant_exp2 = "Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
1455 \n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
1456 \n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
1457 \n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
1458 \n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
1459 assert_eq!(
1460 text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1461 constant_exp2
1462 );
1463 }
1464}