1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::connection::ConnectionId;
24use mz_cluster_client::ReplicaId;
25use mz_compute_client::controller::PeekNotification;
26use mz_compute_client::protocol::command::PeekTarget;
27use mz_compute_client::protocol::response::PeekResponse;
28use mz_compute_types::ComputeInstanceId;
29use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
30use mz_controller_types::ClusterId;
31use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
32use mz_expr::row::RowCollection;
33use mz_expr::{
34 EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
35 RowSetFinishingIncremental, permutation_for_arrangement,
36};
37use mz_ore::cast::CastFrom;
38use mz_ore::collections::CollectionExt;
39use mz_ore::soft_assert_eq_or_log;
40use mz_ore::str::{StrExt, separated};
41use mz_ore::task;
42use mz_ore::tracing::OpenTelemetryContext;
43use mz_persist_client::Schemas;
44use mz_persist_types::codec_impls::UnitSchema;
45use mz_repr::explain::text::DisplayText;
46use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
47use mz_repr::{
48 Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
49 preserves_order,
50};
51use mz_storage_types::sources::SourceData;
52use serde::{Deserialize, Serialize};
53use timely::progress::Antichain;
54use tokio::sync::oneshot;
55use tracing::{Instrument, Span};
56use uuid::Uuid;
57
58use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
59use crate::coord::timestamp_selection::TimestampDetermination;
60use crate::optimize::OptimizerError;
61use crate::statement_logging::WatchSetCreation;
62use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
63use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse};
64
65#[derive(Debug)]
67pub(crate) struct PendingPeek {
68 pub(crate) conn_id: ConnectionId,
70 pub(crate) cluster_id: ClusterId,
72 pub(crate) depends_on: BTreeSet<GlobalId>,
74 pub(crate) ctx_extra: ExecuteContextGuard,
77 pub(crate) is_fast_path: bool,
79}
80
81#[derive(Debug)]
86pub enum PeekResponseUnary {
87 Rows(Box<dyn RowIterator + Send + Sync>),
88 Error(String),
89 Canceled,
90 DependencyDropped(DroppedDependency),
96}
97
98#[derive(Clone, Debug)]
104pub enum DroppedDependency {
105 Relation { name: String },
106 Cluster { name: String },
107}
108
109impl fmt::Display for DroppedDependency {
110 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111 match self {
112 Self::Relation { name } => write!(f, "relation {}", name.quoted()),
113 Self::Cluster { name } => write!(f, "cluster {}", name.quoted()),
114 }
115 }
116}
117
118impl DroppedDependency {
119 pub fn query_terminated_error(&self) -> String {
122 format!("query could not complete because {self} was dropped")
123 }
124
125 pub fn to_concurrent_dependency_drop(&self) -> AdapterError {
127 let (kind, name) = match self {
128 Self::Relation { name } => ("relation", name.clone()),
129 Self::Cluster { name } => ("cluster", name.clone()),
130 };
131 AdapterError::ConcurrentDependencyDrop {
132 dependency_kind: kind,
133 dependency_id: name,
134 }
135 }
136}
137
138#[derive(Clone, Debug)]
139pub struct PeekDataflowPlan {
140 pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan, ()>,
141 pub(crate) id: GlobalId,
142 key: Vec<MirScalarExpr>,
143 permutation: Vec<usize>,
144 thinned_arity: usize,
145}
146
147impl PeekDataflowPlan {
148 pub fn new(
149 desc: DataflowDescription<mz_compute_types::plan::Plan, ()>,
150 id: GlobalId,
151 typ: &SqlRelationType,
152 ) -> Self {
153 let arity = typ.arity();
154 let key = typ
155 .default_key()
156 .into_iter()
157 .map(MirScalarExpr::column)
158 .collect::<Vec<_>>();
159 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
160 Self {
161 desc,
162 id,
163 key,
164 permutation,
165 thinned_arity: thinning.len(),
166 }
167 }
168}
169
170#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
171pub enum FastPathPlan {
172 Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
177 PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
180 PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
182}
183
184impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
185 fn fmt_text(
186 &self,
187 f: &mut fmt::Formatter<'_>,
188 ctx: &mut PlanRenderingContext<'a, T>,
189 ) -> fmt::Result {
190 if ctx.config.verbose_syntax {
191 self.fmt_verbose_text(f, ctx)
192 } else {
193 self.fmt_default_text(f, ctx)
194 }
195 }
196}
197
198impl FastPathPlan {
199 pub fn fmt_default_text<'a, T>(
200 &self,
201 f: &mut fmt::Formatter<'_>,
202 ctx: &mut PlanRenderingContext<'a, T>,
203 ) -> fmt::Result {
204 let mode = HumanizedExplain::new(ctx.config.redacted);
205
206 match self {
207 FastPathPlan::Constant(rows, _) => {
208 write!(f, "{}→Constant ", ctx.indent)?;
209
210 match rows {
211 Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
212 Err(err) => {
213 if mode.redacted() {
214 writeln!(f, "(error: █)")?;
215 } else {
216 writeln!(f, "(error: {})", err.to_string().quoted(),)?;
217 }
218 }
219 }
220 }
221 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
222 let coll = ctx
223 .humanizer
224 .humanize_id(*coll_id)
225 .unwrap_or_else(|| coll_id.to_string());
226 let idx = ctx
227 .humanizer
228 .humanize_id(*idx_id)
229 .unwrap_or_else(|| idx_id.to_string());
230 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
231 ctx.indent.set();
232
233 ctx.indent += 1;
234
235 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
236 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
237
238 if printed {
239 ctx.indent += 1;
240 }
241 if let Some(literal_constraints) = literal_constraints {
242 writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
243 ctx.indent += 1;
244 let values = separated("; ", mode.seq(literal_constraints, None));
245 writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
246 } else {
247 writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
248 }
249
250 ctx.indent.reset();
251 }
252 FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
253 let coll = ctx
254 .humanizer
255 .humanize_id(*global_id)
256 .unwrap_or_else(|| global_id.to_string());
257 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
258 ctx.indent.set();
259
260 ctx.indent += 1;
261
262 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
263 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
264
265 if printed {
266 ctx.indent += 1;
267 }
268 if let Some(literal_constraint) = literal_constraint {
269 writeln!(f, "{}→ReadStorage Lookup on {coll}", ctx.indent)?;
270 ctx.indent += 1;
271 let value = mode.expr(literal_constraint, None);
272 writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
273 } else {
274 writeln!(f, "{}→ReadStorage {coll}", ctx.indent)?;
275 }
276
277 ctx.indent.reset();
278 }
279 }
280
281 Ok(())
282 }
283
284 pub fn fmt_verbose_text<'a, T>(
285 &self,
286 f: &mut fmt::Formatter<'_>,
287 ctx: &mut PlanRenderingContext<'a, T>,
288 ) -> fmt::Result {
289 let redacted = ctx.config.redacted;
290 let mode = HumanizedExplain::new(redacted);
291
292 match self {
295 FastPathPlan::Constant(Ok(rows), _) => {
296 if !rows.is_empty() {
297 writeln!(f, "{}Constant", ctx.indent)?;
298 *ctx.as_mut() += 1;
299 fmt_text_constant_rows(
300 f,
301 rows.iter().map(|(row, diff)| (row, diff)),
302 ctx.as_mut(),
303 redacted,
304 )?;
305 *ctx.as_mut() -= 1;
306 } else {
307 writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
308 }
309 Ok(())
310 }
311 FastPathPlan::Constant(Err(err), _) => {
312 if redacted {
313 writeln!(f, "{}Error █", ctx.as_mut())
314 } else {
315 writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
316 }
317 }
318 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
319 ctx.as_mut().set();
320 let (map, filter, project) = mfp.as_map_filter_project();
321
322 let cols = if !ctx.config.humanized_exprs {
323 None
324 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
325 let cols = itertools::chain(
329 cols.iter().cloned(),
330 std::iter::repeat(String::new()).take(map.len()),
331 )
332 .collect();
333 Some(cols)
334 } else {
335 None
336 };
337
338 if project.len() != mfp.input_arity + map.len()
339 || !project.iter().enumerate().all(|(i, o)| i == *o)
340 {
341 let outputs = mode.seq(&project, cols.as_ref());
342 let outputs = CompactScalars(outputs);
343 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
344 *ctx.as_mut() += 1;
345 }
346 if !filter.is_empty() {
347 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
348 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
349 *ctx.as_mut() += 1;
350 }
351 if !map.is_empty() {
352 let scalars = mode.seq(&map, cols.as_ref());
353 let scalars = CompactScalars(scalars);
354 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
355 *ctx.as_mut() += 1;
356 }
357 MirRelationExpr::fmt_indexed_filter(
358 f,
359 ctx,
360 coll_id,
361 idx_id,
362 literal_constraints.clone(),
363 None,
364 )?;
365 writeln!(f)?;
366 ctx.as_mut().reset();
367 Ok(())
368 }
369 FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
370 ctx.as_mut().set();
371 let (map, filter, project) = mfp.as_map_filter_project();
372
373 let cols = if !ctx.config.humanized_exprs {
374 None
375 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
376 let cols = itertools::chain(
377 cols.iter().cloned(),
378 std::iter::repeat(String::new()).take(map.len()),
379 )
380 .collect::<Vec<_>>();
381 Some(cols)
382 } else {
383 None
384 };
385
386 if project.len() != mfp.input_arity + map.len()
387 || !project.iter().enumerate().all(|(i, o)| i == *o)
388 {
389 let outputs = mode.seq(&project, cols.as_ref());
390 let outputs = CompactScalars(outputs);
391 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
392 *ctx.as_mut() += 1;
393 }
394 if !filter.is_empty() {
395 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
396 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
397 *ctx.as_mut() += 1;
398 }
399 if !map.is_empty() {
400 let scalars = mode.seq(&map, cols.as_ref());
401 let scalars = CompactScalars(scalars);
402 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
403 *ctx.as_mut() += 1;
404 }
405 let human_id = ctx
406 .humanizer
407 .humanize_id(*gid)
408 .unwrap_or_else(|| gid.to_string());
409 write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
410 if let Some(literal) = literal_constraint {
411 let value = mode.expr(literal, None);
412 writeln!(f, " [value={}]", value)?;
413 } else {
414 writeln!(f, "")?;
415 }
416 ctx.as_mut().reset();
417 Ok(())
418 }
419 }?;
420 Ok(())
421 }
422}
423
424#[derive(Debug)]
425pub struct PlannedPeek {
426 pub plan: PeekPlan,
427 pub determination: TimestampDetermination,
428 pub conn_id: ConnectionId,
429 pub intermediate_result_type: SqlRelationType,
436 pub source_arity: usize,
437 pub source_ids: BTreeSet<GlobalId>,
438}
439
440#[derive(Clone, Debug)]
442pub enum PeekPlan {
443 FastPath(FastPathPlan),
444 SlowPath(PeekDataflowPlan),
446}
447
448fn mfp_to_safe_plan(
453 mfp: mz_expr::MapFilterProject,
454) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
455 mfp.into_plan()
456 .map_err(OptimizerError::InternalUnsafeMfpPlan)?
457 .into_nontemporal()
458 .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
459}
460
461fn permute_oneshot_mfp_around_index(
463 mfp: mz_expr::MapFilterProject,
464 key: &[MirScalarExpr],
465) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
466 let input_arity = mfp.input_arity;
467 let mut safe_mfp = mfp_to_safe_plan(mfp)?;
468 let (permute, thinning) = permutation_for_arrangement(key, input_arity);
469 safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
470 Ok(safe_mfp)
471}
472
473pub fn create_fast_path_plan(
479 dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr>,
480 view_id: GlobalId,
481 finishing: Option<&RowSetFinishing>,
482 persist_fast_path_limit: usize,
483 persist_fast_path_order: bool,
484) -> Result<Option<FastPathPlan>, OptimizerError> {
485 if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
491 {
492 let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
493 if let Some((rows, found_typ)) = mir.as_const() {
494 let plan = FastPathPlan::Constant(
496 rows.clone(),
497 mz_repr::SqlRelationType::from_repr(found_typ),
498 );
499 return Ok(Some(plan));
500 } else {
501 if let MirRelationExpr::TopK {
504 input,
505 group_key,
506 order_key,
507 limit,
508 offset,
509 monotonic: _,
510 expected_group_size: _,
511 } = mir
512 {
513 if let Some(finishing) = finishing {
514 if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
515 let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
518 (None, _) => true,
519 (Some(..), None) => false,
520 (Some(topk_limit), Some(finishing_limit)) => {
521 if let Some(l) = topk_limit.as_literal_int64() {
522 i128::cast_from(l)
523 >= i128::cast_from(*finishing_limit)
524 + i128::cast_from(finishing.offset)
525 } else {
526 false
527 }
528 }
529 };
530 if finishing_limits_at_least_as_topk {
531 mir = input;
532 }
533 }
534 }
535 }
536 let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
540 match mir {
541 MirRelationExpr::Get {
542 id: Id::Global(get_id),
543 typ: repr_typ,
544 ..
545 } => {
546 for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
548 if desc.on_id == *get_id {
549 return Ok(Some(FastPathPlan::PeekExisting(
550 *get_id,
551 *index_id,
552 None,
553 permute_oneshot_mfp_around_index(mfp, &desc.key)?,
554 )));
555 }
556 }
557
558 let safe_mfp = mfp_to_safe_plan(mfp)?;
562 let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
563
564 let persist_fast_path_order_relation_typ = if persist_fast_path_order {
565 Some(
566 dataflow_plan
567 .source_imports
568 .get(get_id)
569 .expect("Get's ID is also imported")
570 .desc
571 .typ
572 .clone(),
573 )
574 } else {
575 None
576 };
577
578 let literal_constraint =
579 if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
580 let mut row = Row::default();
581 let mut packer = row.packer();
582 for (idx, col) in relation_typ.column_types.iter().enumerate() {
583 if !preserves_order(&col.scalar_type) {
584 break;
585 }
586 let col_expr = MirScalarExpr::column(idx);
587
588 let Some((literal, _)) = filters
589 .iter()
590 .filter_map(|f| f.expr_eq_literal(&col_expr))
591 .next()
592 else {
593 break;
594 };
595 packer.extend_by_row(&literal);
596 }
597 if row.is_empty() { None } else { Some(row) }
598 } else {
599 None
600 };
601
602 let finish_ok = match &finishing {
603 None => false,
604 Some(RowSetFinishing {
605 order_by,
606 limit,
607 offset,
608 ..
609 }) => {
610 let order_ok =
611 if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
612 order_by.iter().enumerate().all(|(idx, order)| {
613 let column_idx = projection[order.column];
616 if column_idx >= safe_mfp.input_arity {
617 return false;
618 }
619 let column_type = &relation_typ.column_types[column_idx];
620 let index_ok = idx == column_idx;
621 let nulls_ok = !column_type.nullable || order.nulls_last;
622 let asc_ok = !order.desc;
623 let type_ok = preserves_order(&column_type.scalar_type);
624 index_ok && nulls_ok && asc_ok && type_ok
625 })
626 } else {
627 order_by.is_empty()
628 };
629 let limit_ok = limit.map_or(false, |l| {
630 usize::cast_from(l) + *offset < persist_fast_path_limit
631 });
632 order_ok && limit_ok
633 }
634 };
635
636 let key_constraint = if let Some(literal) = &literal_constraint {
637 let prefix_len = literal.iter().count();
638 repr_typ
639 .keys
640 .iter()
641 .any(|k| k.iter().all(|idx| *idx < prefix_len))
642 } else {
643 false
644 };
645
646 if key_constraint || (filters.is_empty() && finish_ok) {
650 return Ok(Some(FastPathPlan::PeekPersist(
651 *get_id,
652 literal_constraint,
653 safe_mfp,
654 )));
655 }
656 }
657 MirRelationExpr::Join { implementation, .. } => {
658 if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
659 implementation
660 {
661 return Ok(Some(FastPathPlan::PeekExisting(
662 *coll_id,
663 *idx_id,
664 Some(vals.clone()),
665 permute_oneshot_mfp_around_index(mfp, key)?,
666 )));
667 }
668 }
669 _ => {}
671 }
672 }
673 }
674 Ok(None)
675}
676
677impl FastPathPlan {
678 pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
679 match self {
680 FastPathPlan::Constant(..) => UsedIndexes::default(),
681 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
682 if literal_constraints.is_some() {
683 UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
684 } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
685 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
686 } else {
687 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
688 }
689 }
690 FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
691 }
692 }
693}
694
695impl crate::coord::Coordinator {
696 #[mz_ore::instrument(level = "debug")]
698 pub async fn implement_peek_plan(
699 &mut self,
700 ctx_extra: &mut ExecuteContextGuard,
701 plan: PlannedPeek,
702 finishing: RowSetFinishing,
703 compute_instance: ComputeInstanceId,
704 target_replica: Option<ReplicaId>,
705 max_result_size: u64,
706 max_returned_query_size: Option<u64>,
707 ) -> Result<ExecuteResponse, AdapterError> {
708 let PlannedPeek {
709 plan: fast_path,
710 determination,
711 conn_id,
712 intermediate_result_type,
713 source_arity,
714 source_ids,
715 } = plan;
716
717 if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
719 let mut rows = match rows {
720 Ok(rows) => rows,
721 Err(e) => return Err(e.into()),
722 };
723 consolidate(&mut rows);
725
726 let mut results = Vec::new();
727 for (row, count) in rows {
728 if count.is_negative() {
729 Err(EvalError::InvalidParameterValue(
730 format!("Negative multiplicity in constant result: {}", count).into(),
731 ))?
732 };
733 if count.is_positive() {
734 let count = usize::cast_from(
735 u64::try_from(count.into_inner())
736 .expect("known to be positive from check above"),
737 );
738 results.push((
739 row,
740 NonZeroUsize::new(count).expect("known to be non-zero from check above"),
741 ));
742 }
743 }
744 let row_collection = RowCollection::new(results, &finishing.order_by);
745 let duration_histogram = self.metrics.row_set_finishing_seconds();
746
747 let (ret, reason) = match finishing.finish(
748 row_collection,
749 max_result_size,
750 max_returned_query_size,
751 &duration_histogram,
752 ) {
753 Ok((rows, row_size_bytes)) => {
754 let result_size = u64::cast_from(row_size_bytes);
755 let rows_returned = u64::cast_from(rows.count());
756 (
757 Ok(Self::send_immediate_rows(rows)),
758 StatementEndedExecutionReason::Success {
759 result_size: Some(result_size),
760 rows_returned: Some(rows_returned),
761 execution_strategy: Some(StatementExecutionStrategy::Constant),
762 },
763 )
764 }
765 Err(error) => (
766 Err(AdapterError::ResultSize(error.clone())),
767 StatementEndedExecutionReason::Errored { error },
768 ),
769 };
770 self.retire_execution(reason, std::mem::take(ctx_extra).defuse());
771 return ret;
772 }
773
774 let timestamp = determination.timestamp_context.timestamp_or_default();
775 if let Some(id) = ctx_extra.contents() {
776 self.set_statement_execution_timestamp(id, timestamp)
777 }
778
779 let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy, read_hold) =
789 match fast_path {
790 PeekPlan::FastPath(FastPathPlan::PeekExisting(
791 _coll_id,
792 idx_id,
793 literal_constraints,
794 map_filter_project,
795 )) => {
796 let read_hold = self
797 .controller
798 .compute
799 .acquire_read_hold(compute_instance, idx_id)
800 .map_err(
801 AdapterError::concurrent_dependency_drop_from_collection_update_error,
802 )?;
803 (
804 (literal_constraints, timestamp, map_filter_project),
805 None,
806 true,
807 PeekTarget::Index { id: idx_id },
808 StatementExecutionStrategy::FastPath,
809 read_hold,
810 )
811 }
812 PeekPlan::FastPath(FastPathPlan::PeekPersist(
813 coll_id,
814 literal_constraint,
815 map_filter_project,
816 )) => {
817 let peek_command = (
818 literal_constraint.map(|r| vec![r]),
819 timestamp,
820 map_filter_project,
821 );
822 let metadata = self
823 .controller
824 .storage
825 .collection_metadata(coll_id)
826 .expect("storage collection for fast-path peek")
827 .clone();
828 let read_hold = self
829 .controller
830 .storage_collections
831 .acquire_read_holds(vec![coll_id])
832 .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
833 .into_element();
834 (
835 peek_command,
836 None,
837 true,
838 PeekTarget::Persist {
839 id: coll_id,
840 metadata,
841 },
842 StatementExecutionStrategy::PersistFastPath,
843 read_hold,
844 )
845 }
846 PeekPlan::SlowPath(PeekDataflowPlan {
847 desc: dataflow,
848 id: index_id,
852 key: index_key,
853 permutation: index_permutation,
854 thinned_arity: index_thinned_arity,
855 }) => {
856 let exports: Vec<GlobalId> = dataflow.export_ids().collect();
862 soft_assert_eq_or_log!(
863 exports.as_slice(),
864 &[index_id],
865 "slow-path peek dataflow must export exactly [index_id]",
866 );
867 if exports.as_slice() != [index_id] {
868 return Err(AdapterError::internal(
869 "peek error",
870 format!(
871 "slow-path peek dataflow exports {exports:?}, expected [{index_id}]",
872 ),
873 ));
874 }
875
876 self.controller
878 .compute
879 .create_dataflow(compute_instance, dataflow, None)
880 .map_err(
881 AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
882 )?;
883
884 let acquire_result = self
887 .controller
888 .compute
889 .acquire_read_hold(compute_instance, index_id)
890 .map_err(
891 AdapterError::concurrent_dependency_drop_from_collection_update_error,
892 );
893 let read_hold = match acquire_result {
894 Ok(hold) => hold,
895 Err(e) => {
896 self.drop_compute_collections(vec![(compute_instance, index_id)]);
897 return Err(e);
898 }
899 };
900
901 let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
903 map_filter_project.permute_fn(
904 |c| index_permutation[c],
905 index_key.len() + index_thinned_arity,
906 );
907 let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
908
909 (
910 (None, timestamp, map_filter_project),
911 Some(index_id),
912 false,
913 PeekTarget::Index { id: index_id },
914 StatementExecutionStrategy::Standard,
915 read_hold,
916 )
917 }
918 PeekPlan::FastPath(_) => {
919 unreachable!()
920 }
921 };
922
923 let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
925
926 let mut uuid = Uuid::new_v4();
929 while self.pending_peeks.contains_key(&uuid) {
930 uuid = Uuid::new_v4();
931 }
932
933 let (literal_constraints, timestamp, map_filter_project) = peek_command;
934
935 let peek_result_column_names =
938 (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
939 let peek_result_desc =
940 RelationDesc::new(intermediate_result_type, peek_result_column_names);
941
942 let peek_result = self
943 .controller
944 .compute
945 .peek(
946 compute_instance,
947 peek_target,
948 literal_constraints,
949 uuid,
950 timestamp,
951 peek_result_desc,
952 finishing.clone(),
953 map_filter_project,
954 read_hold,
955 target_replica,
956 rows_tx,
957 )
958 .map_err(AdapterError::concurrent_dependency_drop_from_peek_error);
959 if let Err(e) = peek_result {
960 if let Some(index_id) = drop_dataflow {
962 self.drop_compute_collections(vec![(compute_instance, index_id)]);
963 }
964 return Err(e);
965 }
966
967 self.pending_peeks.insert(
971 uuid,
972 PendingPeek {
973 conn_id: conn_id.clone(),
974 cluster_id: compute_instance,
975 depends_on: source_ids,
976 ctx_extra: std::mem::take(ctx_extra),
977 is_fast_path,
978 },
979 );
980 self.client_pending_peeks
981 .entry(conn_id)
982 .or_default()
983 .insert(uuid, compute_instance);
984
985 let duration_histogram = self.metrics.row_set_finishing_seconds();
986
987 if let Some(index_id) = drop_dataflow {
994 self.drop_compute_collections(vec![(compute_instance, index_id)]);
995 }
996
997 let persist_client = self.persist_client.clone();
998 let peek_stash_read_batch_size_bytes =
999 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
1000 .get(self.catalog().system_config().dyncfgs());
1001 let peek_stash_read_memory_budget_bytes =
1002 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
1003 .get(self.catalog().system_config().dyncfgs());
1004
1005 let peek_response_stream = Self::create_peek_response_stream(
1006 rows_rx,
1007 finishing,
1008 max_result_size,
1009 max_returned_query_size,
1010 duration_histogram,
1011 persist_client,
1012 peek_stash_read_batch_size_bytes,
1013 peek_stash_read_memory_budget_bytes,
1014 );
1015
1016 Ok(crate::ExecuteResponse::SendingRowsStreaming {
1017 rows: Box::pin(peek_response_stream),
1018 instance_id: compute_instance,
1019 strategy,
1020 })
1021 }
1022
1023 #[mz_ore::instrument(level = "debug")]
1027 pub(crate) fn create_peek_response_stream(
1028 rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
1029 finishing: RowSetFinishing,
1030 max_result_size: u64,
1031 max_returned_query_size: Option<u64>,
1032 duration_histogram: prometheus::Histogram,
1033 mut persist_client: mz_persist_client::PersistClient,
1034 peek_stash_read_batch_size_bytes: usize,
1035 peek_stash_read_memory_budget_bytes: usize,
1036 ) -> impl futures::Stream<Item = PeekResponseUnary> {
1037 async_stream::stream!({
1038 let result = rows_rx.await;
1039
1040 let rows = match result {
1041 Ok(rows) => rows,
1042 Err(e) => {
1043 yield PeekResponseUnary::Error(e.to_string());
1044 return;
1045 }
1046 };
1047
1048 match rows {
1049 PeekResponse::Rows(rows) => {
1050 let rows = RowCollection::merge_sorted(&rows, &finishing.order_by);
1051 match finishing.finish(
1052 rows,
1053 max_result_size,
1054 max_returned_query_size,
1055 &duration_histogram,
1056 ) {
1057 Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
1058 Err(e) => yield PeekResponseUnary::Error(e),
1059 }
1060 }
1061 PeekResponse::Stashed(response) => {
1062 let response = *response;
1063
1064 let shard_id = response.shard_id;
1065
1066 let mut batches = Vec::new();
1067 for proto_batch in response.batches.into_iter() {
1068 let batch =
1069 persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
1070
1071 batches.push(batch);
1072 }
1073 tracing::trace!(?batches, "stashed peek response");
1074
1075 let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
1076 let read_schemas: Schemas<SourceData, ()> = Schemas {
1077 id: None,
1078 key: Arc::new(response.relation_desc.clone()),
1079 val: Arc::new(UnitSchema),
1080 };
1081
1082 let mut row_cursor = persist_client
1083 .read_batches_consolidated::<_, _, _, i64>(
1084 response.shard_id,
1085 as_of,
1086 read_schemas,
1087 batches,
1088 |_stats| true,
1089 peek_stash_read_memory_budget_bytes,
1090 )
1091 .await
1092 .expect("invalid usage");
1093
1094 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1113 mz_ore::task::spawn(|| "read_peek_batches", async move {
1114 for rows in response.inline_rows {
1124 let result = tx.send(rows).await;
1125 if result.is_err() {
1126 tracing::debug!("receiver went away");
1127 }
1128 }
1129
1130 let mut current_batch = Vec::new();
1131 let mut current_batch_size: usize = 0;
1132
1133 'outer: while let Some(rows) = row_cursor.next().await {
1134 for ((source_data, _val), _ts, diff) in rows {
1135 let row = source_data
1136 .0
1137 .expect("we are not sending errors on this code path");
1138
1139 let diff = usize::try_from(diff)
1140 .expect("peek responses cannot have negative diffs");
1141
1142 if diff > 0 {
1143 let diff =
1144 NonZeroUsize::new(diff).expect("checked to be non-zero");
1145 current_batch_size =
1146 current_batch_size.saturating_add(row.byte_len());
1147 current_batch.push((row, diff));
1148 }
1149
1150 if current_batch_size > peek_stash_read_batch_size_bytes {
1151 let result = tx
1157 .send(RowCollection::new(
1158 current_batch.drain(..).collect_vec(),
1159 &[],
1160 ))
1161 .await;
1162 if result.is_err() {
1163 tracing::debug!("receiver went away");
1164 break 'outer;
1167 }
1168
1169 current_batch_size = 0;
1170 }
1171 }
1172 }
1173
1174 if current_batch.len() > 0 {
1175 let result = tx.send(RowCollection::new(current_batch, &[])).await;
1176 if result.is_err() {
1177 tracing::debug!("receiver went away");
1178 }
1179 }
1180
1181 let batches = row_cursor.into_lease();
1182 tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1183 for batch in batches {
1184 batch.delete().await;
1185 }
1186 });
1187
1188 assert!(
1189 finishing.is_streamable(response.relation_desc.arity()),
1190 "can only get stashed responses when the finishing is streamable"
1191 );
1192
1193 tracing::trace!("query result is streamable!");
1194
1195 assert!(finishing.is_streamable(response.relation_desc.arity()));
1196 let mut incremental_finishing = RowSetFinishingIncremental::new(
1197 finishing.offset,
1198 finishing.limit,
1199 finishing.project,
1200 max_returned_query_size,
1201 );
1202
1203 let mut got_zero_rows = true;
1204 while let Some(rows) = rx.recv().await {
1205 got_zero_rows = false;
1206
1207 let result_rows = incremental_finishing.finish_incremental(
1208 rows,
1209 max_result_size,
1210 &duration_histogram,
1211 );
1212
1213 match result_rows {
1214 Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1215 Err(e) => yield PeekResponseUnary::Error(e),
1216 }
1217 }
1218
1219 if got_zero_rows {
1222 let row_iter = vec![].into_row_iter();
1223 yield PeekResponseUnary::Rows(Box::new(row_iter));
1224 }
1225 }
1226 PeekResponse::Canceled => {
1227 yield PeekResponseUnary::Canceled;
1228 }
1229 PeekResponse::Error(e) => {
1230 yield PeekResponseUnary::Error(e);
1231 }
1232 }
1233 })
1234 }
1235
1236 #[mz_ore::instrument(level = "debug")]
1238 pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1239 if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1240 self.metrics
1241 .canceled_peeks
1242 .inc_by(u64::cast_from(uuids.len()));
1243
1244 let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1245 for (uuid, compute_instance) in &uuids {
1246 inverse.entry(*compute_instance).or_default().insert(*uuid);
1247 }
1248 for (compute_instance, uuids) in inverse {
1249 for uuid in uuids {
1254 let _ = self.controller.compute.cancel_peek(
1255 compute_instance,
1256 uuid,
1257 PeekResponse::Canceled,
1258 );
1259 }
1260 }
1261
1262 let peeks = uuids
1263 .iter()
1264 .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1265 .collect::<Vec<_>>();
1266 for peek in peeks {
1267 self.retire_execution(
1268 StatementEndedExecutionReason::Canceled,
1269 peek.ctx_extra.defuse(),
1270 );
1271 }
1272 }
1273 }
1274
1275 pub(crate) fn handle_peek_notification(
1278 &mut self,
1279 uuid: Uuid,
1280 notification: PeekNotification,
1281 otel_ctx: OpenTelemetryContext,
1282 ) {
1283 if let Some(PendingPeek {
1286 conn_id: _,
1287 cluster_id: _,
1288 depends_on: _,
1289 ctx_extra,
1290 is_fast_path,
1291 }) = self.remove_pending_peek(&uuid)
1292 {
1293 let reason = match notification {
1294 PeekNotification::Success {
1295 rows: num_rows,
1296 result_size,
1297 } => {
1298 let strategy = if is_fast_path {
1299 StatementExecutionStrategy::FastPath
1300 } else {
1301 StatementExecutionStrategy::Standard
1302 };
1303 StatementEndedExecutionReason::Success {
1304 result_size: Some(result_size),
1305 rows_returned: Some(num_rows),
1306 execution_strategy: Some(strategy),
1307 }
1308 }
1309 PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1310 PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1311 };
1312 otel_ctx.attach_as_parent();
1313 self.retire_execution(reason, ctx_extra.defuse());
1314 }
1315 }
1318
1319 pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1321 let pending_peek = self.pending_peeks.remove(uuid);
1322 if let Some(pending_peek) = &pending_peek {
1323 let uuids = self
1324 .client_pending_peeks
1325 .get_mut(&pending_peek.conn_id)
1326 .expect("coord peek state is inconsistent");
1327 uuids.remove(uuid);
1328 if uuids.is_empty() {
1329 self.client_pending_peeks.remove(&pending_peek.conn_id);
1330 }
1331 }
1332 pending_peek
1333 }
1334
1335 pub(crate) async fn implement_slow_path_peek(
1341 &mut self,
1342 dataflow_plan: PeekDataflowPlan,
1343 determination: TimestampDetermination,
1344 finishing: RowSetFinishing,
1345 compute_instance: ComputeInstanceId,
1346 target_replica: Option<ReplicaId>,
1347 intermediate_result_type: SqlRelationType,
1348 source_ids: BTreeSet<GlobalId>,
1349 conn_id: ConnectionId,
1350 max_result_size: u64,
1351 max_query_result_size: Option<u64>,
1352 watch_set: Option<WatchSetCreation>,
1353 ) -> Result<ExecuteResponse, AdapterError> {
1354 let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1359 if let Some(ws) = watch_set {
1360 self.install_peek_watch_sets(conn_id.clone(), ws)
1361 .map_err(|e| {
1362 AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e)
1363 })?;
1364 }
1365
1366 let source_arity = intermediate_result_type.arity();
1367
1368 let planned_peek = PlannedPeek {
1369 plan: PeekPlan::SlowPath(dataflow_plan),
1370 determination,
1371 conn_id,
1372 intermediate_result_type,
1373 source_arity,
1374 source_ids,
1375 };
1376
1377 self.implement_peek_plan(
1382 &mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
1383 planned_peek,
1384 finishing,
1385 compute_instance,
1386 target_replica,
1387 max_result_size,
1388 max_query_result_size,
1389 )
1390 .await
1391 }
1392
1393 pub(crate) async fn implement_copy_to(
1406 &mut self,
1407 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1408 compute_instance: ComputeInstanceId,
1409 target_replica: Option<ReplicaId>,
1410 source_ids: BTreeSet<GlobalId>,
1411 conn_id: ConnectionId,
1412 watch_set: Option<WatchSetCreation>,
1413 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1414 ) {
1415 let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1417 e: AdapterError| {
1418 let _ = tx.send(Err(e));
1419 };
1420
1421 if let Some(ws) = watch_set {
1425 if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1426 let err = AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e);
1427 send_err(tx, err);
1428 return;
1429 }
1430 }
1431
1432 let sink_id = df_desc.sink_id();
1436
1437 let (sink_tx, sink_rx) = oneshot::channel();
1441 let active_copy_to = ActiveCopyTo {
1442 conn_id: conn_id.clone(),
1443 tx: sink_tx,
1444 cluster_id: compute_instance,
1445 depends_on: source_ids,
1446 };
1447
1448 drop(
1450 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1451 .await,
1452 );
1453
1454 if let Err(e) = self
1457 .try_ship_dataflow(df_desc, compute_instance, target_replica)
1458 .await
1459 .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1460 {
1461 self.remove_active_compute_sink(sink_id).await;
1465 send_err(tx, e);
1466 return;
1467 }
1468
1469 let span = Span::current();
1474 task::spawn(
1475 || "copy to completion",
1476 async move {
1477 let res = sink_rx.await;
1478 let result = match res {
1479 Ok(res) => res,
1480 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1481 };
1482
1483 let _ = tx.send(result);
1484 }
1485 .instrument(span),
1486 );
1487 }
1488
1489 pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1493 where
1494 I: IntoRowIterator,
1495 I::Iter: Send + Sync + 'static,
1496 {
1497 let rows = Box::new(rows.into_row_iter());
1498 ExecuteResponse::SendingRowsImmediate { rows }
1499 }
1500}
1501
1502#[cfg(test)]
1503mod tests {
1504 use mz_expr::func::IsNull;
1505 use mz_expr::{MapFilterProject, UnaryFunc};
1506 use mz_ore::str::Indent;
1507 use mz_repr::explain::text::text_string_at;
1508 use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1509 use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1510
1511 use super::*;
1512
1513 #[mz_ore::test]
1514 #[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
1516 let typ = SqlRelationType::new(vec![SqlColumnType {
1517 scalar_type: SqlScalarType::String,
1518 nullable: false,
1519 }]);
1520 let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1521 let no_lookup = FastPathPlan::PeekExisting(
1522 GlobalId::User(8),
1523 GlobalId::User(10),
1524 None,
1525 MapFilterProject::new(4)
1526 .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1527 .project([1, 4])
1528 .into_plan()
1529 .expect("invalid plan")
1530 .into_nontemporal()
1531 .expect("invalid nontemporal"),
1532 );
1533 let lookup = FastPathPlan::PeekExisting(
1534 GlobalId::User(9),
1535 GlobalId::User(11),
1536 Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1537 MapFilterProject::new(3)
1538 .filter(Some(
1539 MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1540 ))
1541 .into_plan()
1542 .expect("invalid plan")
1543 .into_nontemporal()
1544 .expect("invalid nontemporal"),
1545 );
1546
1547 let humanizer = DummyHumanizer;
1548 let config = ExplainConfig {
1549 redacted: false,
1550 verbose_syntax: true,
1551 ..Default::default()
1552 };
1553 let ctx_gen = || {
1554 let indent = Indent::default();
1555 let annotations = BTreeMap::new();
1556 PlanRenderingContext::<FastPathPlan>::new(
1557 indent,
1558 &humanizer,
1559 annotations,
1560 &config,
1561 BTreeSet::default(),
1562 )
1563 };
1564
1565 let constant_err_exp = "Error \"division by zero\"\n";
1566 let no_lookup_exp = "Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1567 let lookup_exp =
1568 "Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1569
1570 assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1571 assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1572 assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1573
1574 let mut constant_rows = vec![
1575 (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1576 (Row::pack(Some(Datum::String("world"))), 2.into()),
1577 (Row::pack(Some(Datum::String("star"))), 500.into()),
1578 ];
1579 let constant_exp1 =
1580 "Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
1581 assert_eq!(
1582 text_string_at(
1583 &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1584 ctx_gen
1585 ),
1586 constant_exp1
1587 );
1588 constant_rows
1589 .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1590 let constant_exp2 = "Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
1591 \n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
1592 \n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
1593 \n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
1594 \n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
1595 assert_eq!(
1596 text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1597 constant_exp2
1598 );
1599 }
1600}