1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35 EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36 RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::task;
41use mz_ore::tracing::OpenTelemetryContext;
42use mz_persist_client::Schemas;
43use mz_persist_types::codec_impls::UnitSchema;
44use mz_repr::explain::text::DisplayText;
45use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
46use mz_repr::{
47 Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
48 preserves_order,
49};
50use mz_storage_types::sources::SourceData;
51use serde::{Deserialize, Serialize};
52use timely::progress::{Antichain, Timestamp};
53use tokio::sync::oneshot;
54use tracing::{Instrument, Span};
55use uuid::Uuid;
56
57use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
58use crate::coord::timestamp_selection::TimestampDetermination;
59use crate::optimize::OptimizerError;
60use crate::statement_logging::WatchSetCreation;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
62use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse};
63
64#[derive(Debug)]
66pub(crate) struct PendingPeek {
67 pub(crate) conn_id: ConnectionId,
69 pub(crate) cluster_id: ClusterId,
71 pub(crate) depends_on: BTreeSet<GlobalId>,
73 pub(crate) ctx_extra: ExecuteContextGuard,
76 pub(crate) is_fast_path: bool,
78}
79
80#[derive(Debug)]
85pub enum PeekResponseUnary {
86 Rows(Box<dyn RowIterator + Send + Sync>),
87 Error(String),
88 Canceled,
89}
90
91#[derive(Clone, Debug)]
92pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
93 pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
94 pub(crate) id: GlobalId,
95 key: Vec<MirScalarExpr>,
96 permutation: Vec<usize>,
97 thinned_arity: usize,
98}
99
100impl<T> PeekDataflowPlan<T> {
101 pub fn new(
102 desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
103 id: GlobalId,
104 typ: &SqlRelationType,
105 ) -> Self {
106 let arity = typ.arity();
107 let key = typ
108 .default_key()
109 .into_iter()
110 .map(MirScalarExpr::column)
111 .collect::<Vec<_>>();
112 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
113 Self {
114 desc,
115 id,
116 key,
117 permutation,
118 thinned_arity: thinning.len(),
119 }
120 }
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
124pub enum FastPathPlan {
125 Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
130 PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
133 PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
135}
136
137impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
138 fn fmt_text(
139 &self,
140 f: &mut fmt::Formatter<'_>,
141 ctx: &mut PlanRenderingContext<'a, T>,
142 ) -> fmt::Result {
143 if ctx.config.verbose_syntax {
144 self.fmt_verbose_text(f, ctx)
145 } else {
146 self.fmt_default_text(f, ctx)
147 }
148 }
149}
150
151impl FastPathPlan {
152 pub fn fmt_default_text<'a, T>(
153 &self,
154 f: &mut fmt::Formatter<'_>,
155 ctx: &mut PlanRenderingContext<'a, T>,
156 ) -> fmt::Result {
157 let mode = HumanizedExplain::new(ctx.config.redacted);
158
159 match self {
160 FastPathPlan::Constant(rows, _) => {
161 write!(f, "{}→Constant ", ctx.indent)?;
162
163 match rows {
164 Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
165 Err(err) => {
166 if mode.redacted() {
167 writeln!(f, "(error: █)")?;
168 } else {
169 writeln!(f, "(error: {})", err.to_string().quoted(),)?;
170 }
171 }
172 }
173 }
174 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
175 let coll = ctx
176 .humanizer
177 .humanize_id(*coll_id)
178 .unwrap_or_else(|| coll_id.to_string());
179 let idx = ctx
180 .humanizer
181 .humanize_id(*idx_id)
182 .unwrap_or_else(|| idx_id.to_string());
183 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
184 ctx.indent.set();
185
186 ctx.indent += 1;
187
188 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
189 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
190
191 if printed {
192 ctx.indent += 1;
193 }
194 if let Some(literal_constraints) = literal_constraints {
195 writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
196 ctx.indent += 1;
197 let values = separated("; ", mode.seq(literal_constraints, None));
198 writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
199 } else {
200 writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
201 }
202
203 ctx.indent.reset();
204 }
205 FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
206 let coll = ctx
207 .humanizer
208 .humanize_id(*global_id)
209 .unwrap_or_else(|| global_id.to_string());
210 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
211 ctx.indent.set();
212
213 ctx.indent += 1;
214
215 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
216 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
217
218 if printed {
219 ctx.indent += 1;
220 }
221 if let Some(literal_constraint) = literal_constraint {
222 writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
223 ctx.indent += 1;
224 let value = mode.expr(literal_constraint, None);
225 writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
226 } else {
227 writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
228 }
229
230 ctx.indent.reset();
231 }
232 }
233
234 Ok(())
235 }
236
237 pub fn fmt_verbose_text<'a, T>(
238 &self,
239 f: &mut fmt::Formatter<'_>,
240 ctx: &mut PlanRenderingContext<'a, T>,
241 ) -> fmt::Result {
242 let redacted = ctx.config.redacted;
243 let mode = HumanizedExplain::new(redacted);
244
245 match self {
248 FastPathPlan::Constant(Ok(rows), _) => {
249 if !rows.is_empty() {
250 writeln!(f, "{}Constant", ctx.indent)?;
251 *ctx.as_mut() += 1;
252 fmt_text_constant_rows(
253 f,
254 rows.iter().map(|(row, diff)| (row, diff)),
255 ctx.as_mut(),
256 redacted,
257 )?;
258 *ctx.as_mut() -= 1;
259 } else {
260 writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
261 }
262 Ok(())
263 }
264 FastPathPlan::Constant(Err(err), _) => {
265 if redacted {
266 writeln!(f, "{}Error █", ctx.as_mut())
267 } else {
268 writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
269 }
270 }
271 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
272 ctx.as_mut().set();
273 let (map, filter, project) = mfp.as_map_filter_project();
274
275 let cols = if !ctx.config.humanized_exprs {
276 None
277 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
278 let cols = itertools::chain(
282 cols.iter().cloned(),
283 std::iter::repeat(String::new()).take(map.len()),
284 )
285 .collect();
286 Some(cols)
287 } else {
288 None
289 };
290
291 if project.len() != mfp.input_arity + map.len()
292 || !project.iter().enumerate().all(|(i, o)| i == *o)
293 {
294 let outputs = mode.seq(&project, cols.as_ref());
295 let outputs = CompactScalars(outputs);
296 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
297 *ctx.as_mut() += 1;
298 }
299 if !filter.is_empty() {
300 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
301 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
302 *ctx.as_mut() += 1;
303 }
304 if !map.is_empty() {
305 let scalars = mode.seq(&map, cols.as_ref());
306 let scalars = CompactScalars(scalars);
307 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
308 *ctx.as_mut() += 1;
309 }
310 MirRelationExpr::fmt_indexed_filter(
311 f,
312 ctx,
313 coll_id,
314 idx_id,
315 literal_constraints.clone(),
316 None,
317 )?;
318 writeln!(f)?;
319 ctx.as_mut().reset();
320 Ok(())
321 }
322 FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
323 ctx.as_mut().set();
324 let (map, filter, project) = mfp.as_map_filter_project();
325
326 let cols = if !ctx.config.humanized_exprs {
327 None
328 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
329 let cols = itertools::chain(
330 cols.iter().cloned(),
331 std::iter::repeat(String::new()).take(map.len()),
332 )
333 .collect::<Vec<_>>();
334 Some(cols)
335 } else {
336 None
337 };
338
339 if project.len() != mfp.input_arity + map.len()
340 || !project.iter().enumerate().all(|(i, o)| i == *o)
341 {
342 let outputs = mode.seq(&project, cols.as_ref());
343 let outputs = CompactScalars(outputs);
344 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
345 *ctx.as_mut() += 1;
346 }
347 if !filter.is_empty() {
348 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
349 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
350 *ctx.as_mut() += 1;
351 }
352 if !map.is_empty() {
353 let scalars = mode.seq(&map, cols.as_ref());
354 let scalars = CompactScalars(scalars);
355 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
356 *ctx.as_mut() += 1;
357 }
358 let human_id = ctx
359 .humanizer
360 .humanize_id(*gid)
361 .unwrap_or_else(|| gid.to_string());
362 write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
363 if let Some(literal) = literal_constraint {
364 let value = mode.expr(literal, None);
365 writeln!(f, " [value={}]", value)?;
366 } else {
367 writeln!(f, "")?;
368 }
369 ctx.as_mut().reset();
370 Ok(())
371 }
372 }?;
373 Ok(())
374 }
375}
376
377#[derive(Debug)]
378pub struct PlannedPeek {
379 pub plan: PeekPlan,
380 pub determination: TimestampDetermination<mz_repr::Timestamp>,
381 pub conn_id: ConnectionId,
382 pub intermediate_result_type: SqlRelationType,
389 pub source_arity: usize,
390 pub source_ids: BTreeSet<GlobalId>,
391}
392
393#[derive(Clone, Debug)]
395pub enum PeekPlan<T = mz_repr::Timestamp> {
396 FastPath(FastPathPlan),
397 SlowPath(PeekDataflowPlan<T>),
399}
400
401fn mfp_to_safe_plan(
406 mfp: mz_expr::MapFilterProject,
407) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
408 mfp.into_plan()
409 .map_err(OptimizerError::InternalUnsafeMfpPlan)?
410 .into_nontemporal()
411 .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
412}
413
414fn permute_oneshot_mfp_around_index(
416 mfp: mz_expr::MapFilterProject,
417 key: &[MirScalarExpr],
418) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
419 let input_arity = mfp.input_arity;
420 let mut safe_mfp = mfp_to_safe_plan(mfp)?;
421 let (permute, thinning) = permutation_for_arrangement(key, input_arity);
422 safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
423 Ok(safe_mfp)
424}
425
426pub fn create_fast_path_plan<T: Timestamp>(
432 dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
433 view_id: GlobalId,
434 finishing: Option<&RowSetFinishing>,
435 persist_fast_path_limit: usize,
436 persist_fast_path_order: bool,
437) -> Result<Option<FastPathPlan>, OptimizerError> {
438 if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
444 {
445 let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
446 if let Some((rows, found_typ)) = mir.as_const() {
447 let plan = FastPathPlan::Constant(rows.clone(), found_typ.clone());
449 return Ok(Some(plan));
450 } else {
451 if let MirRelationExpr::TopK {
454 input,
455 group_key,
456 order_key,
457 limit,
458 offset,
459 monotonic: _,
460 expected_group_size: _,
461 } = mir
462 {
463 if let Some(finishing) = finishing {
464 if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
465 let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
467 (None, _) => true,
468 (Some(..), None) => false,
469 (Some(topk_limit), Some(finishing_limit)) => {
470 if let Some(l) = topk_limit.as_literal_int64() {
471 l >= *finishing_limit
472 } else {
473 false
474 }
475 }
476 };
477 if finishing_limits_at_least_as_topk {
478 mir = input;
479 }
480 }
481 }
482 }
483 let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
487 match mir {
488 MirRelationExpr::Get {
489 id: Id::Global(get_id),
490 typ: relation_typ,
491 ..
492 } => {
493 for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
495 if desc.on_id == *get_id {
496 return Ok(Some(FastPathPlan::PeekExisting(
497 *get_id,
498 *index_id,
499 None,
500 permute_oneshot_mfp_around_index(mfp, &desc.key)?,
501 )));
502 }
503 }
504
505 let safe_mfp = mfp_to_safe_plan(mfp)?;
509 let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
510
511 let literal_constraint = if persist_fast_path_order {
512 let mut row = Row::default();
513 let mut packer = row.packer();
514 for (idx, col) in relation_typ.column_types.iter().enumerate() {
515 if !preserves_order(&col.scalar_type) {
516 break;
517 }
518 let col_expr = MirScalarExpr::column(idx);
519
520 let Some((literal, _)) = filters
521 .iter()
522 .filter_map(|f| f.expr_eq_literal(&col_expr))
523 .next()
524 else {
525 break;
526 };
527 packer.extend_by_row(&literal);
528 }
529 if row.is_empty() { None } else { Some(row) }
530 } else {
531 None
532 };
533
534 let finish_ok = match &finishing {
535 None => false,
536 Some(RowSetFinishing {
537 order_by,
538 limit,
539 offset,
540 ..
541 }) => {
542 let order_ok = if persist_fast_path_order {
543 order_by.iter().enumerate().all(|(idx, order)| {
544 let column_idx = projection[order.column];
547 if column_idx >= safe_mfp.input_arity {
548 return false;
549 }
550 let column_type = &relation_typ.column_types[column_idx];
551 let index_ok = idx == column_idx;
552 let nulls_ok = !column_type.nullable || order.nulls_last;
553 let asc_ok = !order.desc;
554 let type_ok = preserves_order(&column_type.scalar_type);
555 index_ok && nulls_ok && asc_ok && type_ok
556 })
557 } else {
558 order_by.is_empty()
559 };
560 let limit_ok = limit.map_or(false, |l| {
561 usize::cast_from(l) + *offset < persist_fast_path_limit
562 });
563 order_ok && limit_ok
564 }
565 };
566
567 let key_constraint = if let Some(literal) = &literal_constraint {
568 let prefix_len = literal.iter().count();
569 relation_typ
570 .keys
571 .iter()
572 .any(|k| k.iter().all(|idx| *idx < prefix_len))
573 } else {
574 false
575 };
576
577 if key_constraint || (filters.is_empty() && finish_ok) {
581 return Ok(Some(FastPathPlan::PeekPersist(
582 *get_id,
583 literal_constraint,
584 safe_mfp,
585 )));
586 }
587 }
588 MirRelationExpr::Join { implementation, .. } => {
589 if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
590 implementation
591 {
592 return Ok(Some(FastPathPlan::PeekExisting(
593 *coll_id,
594 *idx_id,
595 Some(vals.clone()),
596 permute_oneshot_mfp_around_index(mfp, key)?,
597 )));
598 }
599 }
600 _ => {}
602 }
603 }
604 }
605 Ok(None)
606}
607
608impl FastPathPlan {
609 pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
610 match self {
611 FastPathPlan::Constant(..) => UsedIndexes::default(),
612 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
613 if literal_constraints.is_some() {
614 UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
615 } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
616 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
617 } else {
618 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
619 }
620 }
621 FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
622 }
623 }
624}
625
626impl crate::coord::Coordinator {
627 #[mz_ore::instrument(level = "debug")]
629 pub async fn implement_peek_plan(
630 &mut self,
631 ctx_extra: &mut ExecuteContextGuard,
632 plan: PlannedPeek,
633 finishing: RowSetFinishing,
634 compute_instance: ComputeInstanceId,
635 target_replica: Option<ReplicaId>,
636 max_result_size: u64,
637 max_returned_query_size: Option<u64>,
638 ) -> Result<ExecuteResponse, AdapterError> {
639 let PlannedPeek {
640 plan: fast_path,
641 determination,
642 conn_id,
643 intermediate_result_type,
644 source_arity,
645 source_ids,
646 } = plan;
647
648 if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
650 let mut rows = match rows {
651 Ok(rows) => rows,
652 Err(e) => return Err(e.into()),
653 };
654 consolidate(&mut rows);
656
657 let mut results = Vec::new();
658 for (row, count) in rows {
659 if count.is_negative() {
660 Err(EvalError::InvalidParameterValue(
661 format!("Negative multiplicity in constant result: {}", count).into(),
662 ))?
663 };
664 if count.is_positive() {
665 let count = usize::cast_from(
666 u64::try_from(count.into_inner())
667 .expect("known to be positive from check above"),
668 );
669 results.push((
670 row,
671 NonZeroUsize::new(count).expect("known to be non-zero from check above"),
672 ));
673 }
674 }
675 let row_collection = RowCollection::new(results, &finishing.order_by);
676 let duration_histogram = self.metrics.row_set_finishing_seconds();
677
678 let (ret, reason) = match finishing.finish(
679 row_collection,
680 max_result_size,
681 max_returned_query_size,
682 &duration_histogram,
683 ) {
684 Ok((rows, row_size_bytes)) => {
685 let result_size = u64::cast_from(row_size_bytes);
686 let rows_returned = u64::cast_from(rows.count());
687 (
688 Ok(Self::send_immediate_rows(rows)),
689 StatementEndedExecutionReason::Success {
690 result_size: Some(result_size),
691 rows_returned: Some(rows_returned),
692 execution_strategy: Some(StatementExecutionStrategy::Constant),
693 },
694 )
695 }
696 Err(error) => (
697 Err(AdapterError::ResultSize(error.clone())),
698 StatementEndedExecutionReason::Errored { error },
699 ),
700 };
701 self.retire_execution(reason, std::mem::take(ctx_extra).defuse());
702 return ret;
703 }
704
705 let timestamp = determination.timestamp_context.timestamp_or_default();
706 if let Some(id) = ctx_extra.contents() {
707 self.set_statement_execution_timestamp(id, timestamp)
708 }
709
710 let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
717 PeekPlan::FastPath(FastPathPlan::PeekExisting(
718 _coll_id,
719 idx_id,
720 literal_constraints,
721 map_filter_project,
722 )) => (
723 (literal_constraints, timestamp, map_filter_project),
724 None,
725 true,
726 PeekTarget::Index { id: idx_id },
727 StatementExecutionStrategy::FastPath,
728 ),
729 PeekPlan::FastPath(FastPathPlan::PeekPersist(
730 coll_id,
731 literal_constraint,
732 map_filter_project,
733 )) => {
734 let peek_command = (
735 literal_constraint.map(|r| vec![r]),
736 timestamp,
737 map_filter_project,
738 );
739 let metadata = self
740 .controller
741 .storage
742 .collection_metadata(coll_id)
743 .expect("storage collection for fast-path peek")
744 .clone();
745 (
746 peek_command,
747 None,
748 true,
749 PeekTarget::Persist {
750 id: coll_id,
751 metadata,
752 },
753 StatementExecutionStrategy::PersistFastPath,
754 )
755 }
756 PeekPlan::SlowPath(PeekDataflowPlan {
757 desc: dataflow,
758 id: index_id,
762 key: index_key,
763 permutation: index_permutation,
764 thinned_arity: index_thinned_arity,
765 }) => {
766 let output_ids = dataflow.export_ids().collect();
767
768 self.controller
770 .compute
771 .create_dataflow(compute_instance, dataflow, None)
772 .map_err(
773 AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
774 )?;
775 self.initialize_compute_read_policies(
776 output_ids,
777 compute_instance,
778 CompactionWindow::DisableCompaction,
780 )
781 .await;
782
783 let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
785 map_filter_project.permute_fn(
786 |c| index_permutation[c],
787 index_key.len() + index_thinned_arity,
788 );
789 let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
790
791 (
792 (None, timestamp, map_filter_project),
793 Some(index_id),
794 false,
795 PeekTarget::Index { id: index_id },
796 StatementExecutionStrategy::Standard,
797 )
798 }
799 _ => {
800 unreachable!()
801 }
802 };
803
804 let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
806
807 let mut uuid = Uuid::new_v4();
810 while self.pending_peeks.contains_key(&uuid) {
811 uuid = Uuid::new_v4();
812 }
813
814 self.pending_peeks.insert(
817 uuid,
818 PendingPeek {
819 conn_id: conn_id.clone(),
820 cluster_id: compute_instance,
821 depends_on: source_ids,
822 ctx_extra: std::mem::take(ctx_extra),
823 is_fast_path,
824 },
825 );
826 self.client_pending_peeks
827 .entry(conn_id)
828 .or_default()
829 .insert(uuid, compute_instance);
830 let (literal_constraints, timestamp, map_filter_project) = peek_command;
831
832 let peek_result_column_names =
835 (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
836 let peek_result_desc =
837 RelationDesc::new(intermediate_result_type, peek_result_column_names);
838
839 self.controller
840 .compute
841 .peek(
842 compute_instance,
843 peek_target,
844 literal_constraints,
845 uuid,
846 timestamp,
847 peek_result_desc,
848 finishing.clone(),
849 map_filter_project,
850 target_replica,
851 rows_tx,
852 )
853 .map_err(AdapterError::concurrent_dependency_drop_from_peek_error)?;
854
855 let duration_histogram = self.metrics.row_set_finishing_seconds();
856
857 if let Some(index_id) = drop_dataflow {
859 self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
860 self.drop_compute_collections(vec![(compute_instance, index_id)]);
861 }
862
863 let persist_client = self.persist_client.clone();
864 let peek_stash_read_batch_size_bytes =
865 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
866 .get(self.catalog().system_config().dyncfgs());
867 let peek_stash_read_memory_budget_bytes =
868 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
869 .get(self.catalog().system_config().dyncfgs());
870
871 let peek_response_stream = Self::create_peek_response_stream(
872 rows_rx,
873 finishing,
874 max_result_size,
875 max_returned_query_size,
876 duration_histogram,
877 persist_client,
878 peek_stash_read_batch_size_bytes,
879 peek_stash_read_memory_budget_bytes,
880 );
881
882 Ok(crate::ExecuteResponse::SendingRowsStreaming {
883 rows: Box::pin(peek_response_stream),
884 instance_id: compute_instance,
885 strategy,
886 })
887 }
888
889 #[mz_ore::instrument(level = "debug")]
893 pub(crate) fn create_peek_response_stream(
894 rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
895 finishing: RowSetFinishing,
896 max_result_size: u64,
897 max_returned_query_size: Option<u64>,
898 duration_histogram: prometheus::Histogram,
899 mut persist_client: mz_persist_client::PersistClient,
900 peek_stash_read_batch_size_bytes: usize,
901 peek_stash_read_memory_budget_bytes: usize,
902 ) -> impl futures::Stream<Item = PeekResponseUnary> {
903 async_stream::stream!({
904 let result = rows_rx.await;
905
906 let rows = match result {
907 Ok(rows) => rows,
908 Err(e) => {
909 yield PeekResponseUnary::Error(e.to_string());
910 return;
911 }
912 };
913
914 match rows {
915 PeekResponse::Rows(rows) => {
916 match finishing.finish(
917 rows,
918 max_result_size,
919 max_returned_query_size,
920 &duration_histogram,
921 ) {
922 Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
923 Err(e) => yield PeekResponseUnary::Error(e),
924 }
925 }
926 PeekResponse::Stashed(response) => {
927 let response = *response;
928
929 let shard_id = response.shard_id;
930
931 let mut batches = Vec::new();
932 for proto_batch in response.batches.into_iter() {
933 let batch =
934 persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
935
936 batches.push(batch);
937 }
938 tracing::trace!(?batches, "stashed peek response");
939
940 let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
941 let read_schemas: Schemas<SourceData, ()> = Schemas {
942 id: None,
943 key: Arc::new(response.relation_desc.clone()),
944 val: Arc::new(UnitSchema),
945 };
946
947 let mut row_cursor = persist_client
948 .read_batches_consolidated::<_, _, _, i64>(
949 response.shard_id,
950 as_of,
951 read_schemas,
952 batches,
953 |_stats| true,
954 peek_stash_read_memory_budget_bytes,
955 )
956 .await
957 .expect("invalid usage");
958
959 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
978 mz_ore::task::spawn(|| "read_peek_batches", async move {
979 let result = tx.send(response.inline_rows).await;
989 if result.is_err() {
990 tracing::debug!("receiver went away");
991 }
992
993 let mut current_batch = Vec::new();
994 let mut current_batch_size: usize = 0;
995
996 'outer: while let Some(rows) = row_cursor.next().await {
997 for ((source_data, _val), _ts, diff) in rows {
998 let row = source_data
999 .0
1000 .expect("we are not sending errors on this code path");
1001
1002 let diff = usize::try_from(diff)
1003 .expect("peek responses cannot have negative diffs");
1004
1005 if diff > 0 {
1006 let diff =
1007 NonZeroUsize::new(diff).expect("checked to be non-zero");
1008 current_batch_size =
1009 current_batch_size.saturating_add(row.byte_len());
1010 current_batch.push((row, diff));
1011 }
1012
1013 if current_batch_size > peek_stash_read_batch_size_bytes {
1014 let result = tx
1020 .send(RowCollection::new(
1021 current_batch.drain(..).collect_vec(),
1022 &[],
1023 ))
1024 .await;
1025 if result.is_err() {
1026 tracing::debug!("receiver went away");
1027 break 'outer;
1030 }
1031
1032 current_batch_size = 0;
1033 }
1034 }
1035 }
1036
1037 if current_batch.len() > 0 {
1038 let result = tx.send(RowCollection::new(current_batch, &[])).await;
1039 if result.is_err() {
1040 tracing::debug!("receiver went away");
1041 }
1042 }
1043
1044 let batches = row_cursor.into_lease();
1045 tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1046 for batch in batches {
1047 batch.delete().await;
1048 }
1049 });
1050
1051 assert!(
1052 finishing.is_streamable(response.relation_desc.arity()),
1053 "can only get stashed responses when the finishing is streamable"
1054 );
1055
1056 tracing::trace!("query result is streamable!");
1057
1058 assert!(finishing.is_streamable(response.relation_desc.arity()));
1059 let mut incremental_finishing = RowSetFinishingIncremental::new(
1060 finishing.offset,
1061 finishing.limit,
1062 finishing.project,
1063 max_returned_query_size,
1064 );
1065
1066 let mut got_zero_rows = true;
1067 while let Some(rows) = rx.recv().await {
1068 got_zero_rows = false;
1069
1070 let result_rows = incremental_finishing.finish_incremental(
1071 rows,
1072 max_result_size,
1073 &duration_histogram,
1074 );
1075
1076 match result_rows {
1077 Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1078 Err(e) => yield PeekResponseUnary::Error(e),
1079 }
1080 }
1081
1082 if got_zero_rows {
1085 let row_iter = vec![].into_row_iter();
1086 yield PeekResponseUnary::Rows(Box::new(row_iter));
1087 }
1088 }
1089 PeekResponse::Canceled => {
1090 yield PeekResponseUnary::Canceled;
1091 }
1092 PeekResponse::Error(e) => {
1093 yield PeekResponseUnary::Error(e);
1094 }
1095 }
1096 })
1097 }
1098
1099 #[mz_ore::instrument(level = "debug")]
1101 pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1102 if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1103 self.metrics
1104 .canceled_peeks
1105 .inc_by(u64::cast_from(uuids.len()));
1106
1107 let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1108 for (uuid, compute_instance) in &uuids {
1109 inverse.entry(*compute_instance).or_default().insert(*uuid);
1110 }
1111 for (compute_instance, uuids) in inverse {
1112 for uuid in uuids {
1117 let _ = self.controller.compute.cancel_peek(
1118 compute_instance,
1119 uuid,
1120 PeekResponse::Canceled,
1121 );
1122 }
1123 }
1124
1125 let peeks = uuids
1126 .iter()
1127 .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1128 .collect::<Vec<_>>();
1129 for peek in peeks {
1130 self.retire_execution(
1131 StatementEndedExecutionReason::Canceled,
1132 peek.ctx_extra.defuse(),
1133 );
1134 }
1135 }
1136 }
1137
1138 pub(crate) fn handle_peek_notification(
1141 &mut self,
1142 uuid: Uuid,
1143 notification: PeekNotification,
1144 otel_ctx: OpenTelemetryContext,
1145 ) {
1146 if let Some(PendingPeek {
1149 conn_id: _,
1150 cluster_id: _,
1151 depends_on: _,
1152 ctx_extra,
1153 is_fast_path,
1154 }) = self.remove_pending_peek(&uuid)
1155 {
1156 let reason = match notification {
1157 PeekNotification::Success {
1158 rows: num_rows,
1159 result_size,
1160 } => {
1161 let strategy = if is_fast_path {
1162 StatementExecutionStrategy::FastPath
1163 } else {
1164 StatementExecutionStrategy::Standard
1165 };
1166 StatementEndedExecutionReason::Success {
1167 result_size: Some(result_size),
1168 rows_returned: Some(num_rows),
1169 execution_strategy: Some(strategy),
1170 }
1171 }
1172 PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1173 PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1174 };
1175 otel_ctx.attach_as_parent();
1176 self.retire_execution(reason, ctx_extra.defuse());
1177 }
1178 }
1181
1182 pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1184 let pending_peek = self.pending_peeks.remove(uuid);
1185 if let Some(pending_peek) = &pending_peek {
1186 let uuids = self
1187 .client_pending_peeks
1188 .get_mut(&pending_peek.conn_id)
1189 .expect("coord peek state is inconsistent");
1190 uuids.remove(uuid);
1191 if uuids.is_empty() {
1192 self.client_pending_peeks.remove(&pending_peek.conn_id);
1193 }
1194 }
1195 pending_peek
1196 }
1197
1198 pub(crate) async fn implement_slow_path_peek(
1204 &mut self,
1205 dataflow_plan: PeekDataflowPlan<mz_repr::Timestamp>,
1206 determination: TimestampDetermination<mz_repr::Timestamp>,
1207 finishing: RowSetFinishing,
1208 compute_instance: ComputeInstanceId,
1209 target_replica: Option<ReplicaId>,
1210 intermediate_result_type: SqlRelationType,
1211 source_ids: BTreeSet<GlobalId>,
1212 conn_id: ConnectionId,
1213 max_result_size: u64,
1214 max_query_result_size: Option<u64>,
1215 watch_set: Option<WatchSetCreation>,
1216 ) -> Result<ExecuteResponse, AdapterError> {
1217 let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1222 if let Some(ws) = watch_set {
1223 self.install_peek_watch_sets(conn_id.clone(), ws)
1224 .map_err(|e| {
1225 AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1226 e,
1227 compute_instance,
1228 )
1229 })?;
1230 }
1231
1232 let source_arity = intermediate_result_type.arity();
1233
1234 let planned_peek = PlannedPeek {
1235 plan: PeekPlan::SlowPath(dataflow_plan),
1236 determination,
1237 conn_id,
1238 intermediate_result_type,
1239 source_arity,
1240 source_ids,
1241 };
1242
1243 self.implement_peek_plan(
1248 &mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
1249 planned_peek,
1250 finishing,
1251 compute_instance,
1252 target_replica,
1253 max_result_size,
1254 max_query_result_size,
1255 )
1256 .await
1257 }
1258
1259 pub(crate) async fn implement_copy_to(
1272 &mut self,
1273 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1274 compute_instance: ComputeInstanceId,
1275 target_replica: Option<ReplicaId>,
1276 source_ids: BTreeSet<GlobalId>,
1277 conn_id: ConnectionId,
1278 watch_set: Option<WatchSetCreation>,
1279 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1280 ) {
1281 let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1283 e: AdapterError| {
1284 let _ = tx.send(Err(e));
1285 };
1286
1287 if let Some(ws) = watch_set {
1291 if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1292 let err = AdapterError::concurrent_dependency_drop_from_collection_lookup_error(
1293 e,
1294 compute_instance,
1295 );
1296 send_err(tx, err);
1297 return;
1298 }
1299 }
1300
1301 let sink_id = df_desc.sink_id();
1305
1306 let (sink_tx, sink_rx) = oneshot::channel();
1310 let active_copy_to = ActiveCopyTo {
1311 conn_id: conn_id.clone(),
1312 tx: sink_tx,
1313 cluster_id: compute_instance,
1314 depends_on: source_ids,
1315 };
1316
1317 drop(
1319 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1320 .await,
1321 );
1322
1323 if let Err(e) = self
1326 .try_ship_dataflow(df_desc, compute_instance, target_replica)
1327 .await
1328 .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1329 {
1330 self.remove_active_compute_sink(sink_id).await;
1334 send_err(tx, e);
1335 return;
1336 }
1337
1338 let span = Span::current();
1343 task::spawn(
1344 || "copy to completion",
1345 async move {
1346 let res = sink_rx.await;
1347 let result = match res {
1348 Ok(res) => res,
1349 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1350 };
1351
1352 let _ = tx.send(result);
1353 }
1354 .instrument(span),
1355 );
1356 }
1357
1358 pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1362 where
1363 I: IntoRowIterator,
1364 I::Iter: Send + Sync + 'static,
1365 {
1366 let rows = Box::new(rows.into_row_iter());
1367 ExecuteResponse::SendingRowsImmediate { rows }
1368 }
1369}
1370
1371#[cfg(test)]
1372mod tests {
1373 use mz_expr::func::IsNull;
1374 use mz_expr::{MapFilterProject, UnaryFunc};
1375 use mz_ore::str::Indent;
1376 use mz_repr::explain::text::text_string_at;
1377 use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1378 use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1379
1380 use super::*;
1381
1382 #[mz_ore::test]
1383 #[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
1385 let typ = SqlRelationType::new(vec![SqlColumnType {
1386 scalar_type: SqlScalarType::String,
1387 nullable: false,
1388 }]);
1389 let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1390 let no_lookup = FastPathPlan::PeekExisting(
1391 GlobalId::User(8),
1392 GlobalId::User(10),
1393 None,
1394 MapFilterProject::new(4)
1395 .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1396 .project([1, 4])
1397 .into_plan()
1398 .expect("invalid plan")
1399 .into_nontemporal()
1400 .expect("invalid nontemporal"),
1401 );
1402 let lookup = FastPathPlan::PeekExisting(
1403 GlobalId::User(9),
1404 GlobalId::User(11),
1405 Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1406 MapFilterProject::new(3)
1407 .filter(Some(
1408 MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1409 ))
1410 .into_plan()
1411 .expect("invalid plan")
1412 .into_nontemporal()
1413 .expect("invalid nontemporal"),
1414 );
1415
1416 let humanizer = DummyHumanizer;
1417 let config = ExplainConfig {
1418 redacted: false,
1419 verbose_syntax: true,
1420 ..Default::default()
1421 };
1422 let ctx_gen = || {
1423 let indent = Indent::default();
1424 let annotations = BTreeMap::new();
1425 PlanRenderingContext::<FastPathPlan>::new(
1426 indent,
1427 &humanizer,
1428 annotations,
1429 &config,
1430 BTreeSet::default(),
1431 )
1432 };
1433
1434 let constant_err_exp = "Error \"division by zero\"\n";
1435 let no_lookup_exp = "Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1436 let lookup_exp =
1437 "Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1438
1439 assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1440 assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1441 assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1442
1443 let mut constant_rows = vec![
1444 (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1445 (Row::pack(Some(Datum::String("world"))), 2.into()),
1446 (Row::pack(Some(Datum::String("star"))), 500.into()),
1447 ];
1448 let constant_exp1 =
1449 "Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
1450 assert_eq!(
1451 text_string_at(
1452 &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1453 ctx_gen
1454 ),
1455 constant_exp1
1456 );
1457 constant_rows
1458 .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1459 let constant_exp2 = "Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
1460 \n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
1461 \n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
1462 \n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
1463 \n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
1464 assert_eq!(
1465 text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1466 constant_exp2
1467 );
1468 }
1469}