1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35 EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36 RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::task;
41use mz_ore::tracing::OpenTelemetryContext;
42use mz_persist_client::Schemas;
43use mz_persist_types::codec_impls::UnitSchema;
44use mz_repr::explain::text::DisplayText;
45use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
46use mz_repr::{
47 Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
48 preserves_order,
49};
50use mz_storage_types::sources::SourceData;
51use serde::{Deserialize, Serialize};
52use timely::progress::{Antichain, Timestamp};
53use tokio::sync::oneshot;
54use tracing::{Instrument, Span};
55use uuid::Uuid;
56
57use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
58use crate::coord::timestamp_selection::TimestampDetermination;
59use crate::optimize::OptimizerError;
60use crate::statement_logging::WatchSetCreation;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
62use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse};
63
64#[derive(Debug)]
66pub(crate) struct PendingPeek {
67 pub(crate) conn_id: ConnectionId,
69 pub(crate) cluster_id: ClusterId,
71 pub(crate) depends_on: BTreeSet<GlobalId>,
73 pub(crate) ctx_extra: ExecuteContextGuard,
76 pub(crate) is_fast_path: bool,
78}
79
80#[derive(Debug)]
85pub enum PeekResponseUnary {
86 Rows(Box<dyn RowIterator + Send + Sync>),
87 Error(String),
88 Canceled,
89}
90
91#[derive(Clone, Debug)]
92pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
93 pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
94 pub(crate) id: GlobalId,
95 key: Vec<MirScalarExpr>,
96 permutation: Vec<usize>,
97 thinned_arity: usize,
98}
99
100impl<T> PeekDataflowPlan<T> {
101 pub fn new(
102 desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
103 id: GlobalId,
104 typ: &SqlRelationType,
105 ) -> Self {
106 let arity = typ.arity();
107 let key = typ
108 .default_key()
109 .into_iter()
110 .map(MirScalarExpr::column)
111 .collect::<Vec<_>>();
112 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
113 Self {
114 desc,
115 id,
116 key,
117 permutation,
118 thinned_arity: thinning.len(),
119 }
120 }
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
124pub enum FastPathPlan {
125 Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
130 PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
133 PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
135}
136
137impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
138 fn fmt_text(
139 &self,
140 f: &mut fmt::Formatter<'_>,
141 ctx: &mut PlanRenderingContext<'a, T>,
142 ) -> fmt::Result {
143 if ctx.config.verbose_syntax {
144 self.fmt_verbose_text(f, ctx)
145 } else {
146 self.fmt_default_text(f, ctx)
147 }
148 }
149}
150
151impl FastPathPlan {
152 pub fn fmt_default_text<'a, T>(
153 &self,
154 f: &mut fmt::Formatter<'_>,
155 ctx: &mut PlanRenderingContext<'a, T>,
156 ) -> fmt::Result {
157 let mode = HumanizedExplain::new(ctx.config.redacted);
158
159 match self {
160 FastPathPlan::Constant(rows, _) => {
161 write!(f, "{}→Constant ", ctx.indent)?;
162
163 match rows {
164 Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
165 Err(err) => {
166 if mode.redacted() {
167 writeln!(f, "(error: █)")?;
168 } else {
169 writeln!(f, "(error: {})", err.to_string().quoted(),)?;
170 }
171 }
172 }
173 }
174 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
175 let coll = ctx
176 .humanizer
177 .humanize_id(*coll_id)
178 .unwrap_or_else(|| coll_id.to_string());
179 let idx = ctx
180 .humanizer
181 .humanize_id(*idx_id)
182 .unwrap_or_else(|| idx_id.to_string());
183 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
184 ctx.indent.set();
185
186 ctx.indent += 1;
187
188 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
189 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
190
191 if printed {
192 ctx.indent += 1;
193 }
194 if let Some(literal_constraints) = literal_constraints {
195 writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
196 ctx.indent += 1;
197 let values = separated("; ", mode.seq(literal_constraints, None));
198 writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
199 } else {
200 writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
201 }
202
203 ctx.indent.reset();
204 }
205 FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
206 let coll = ctx
207 .humanizer
208 .humanize_id(*global_id)
209 .unwrap_or_else(|| global_id.to_string());
210 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
211 ctx.indent.set();
212
213 ctx.indent += 1;
214
215 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
216 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
217
218 if printed {
219 ctx.indent += 1;
220 }
221 if let Some(literal_constraint) = literal_constraint {
222 writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
223 ctx.indent += 1;
224 let value = mode.expr(literal_constraint, None);
225 writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
226 } else {
227 writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
228 }
229
230 ctx.indent.reset();
231 }
232 }
233
234 Ok(())
235 }
236
237 pub fn fmt_verbose_text<'a, T>(
238 &self,
239 f: &mut fmt::Formatter<'_>,
240 ctx: &mut PlanRenderingContext<'a, T>,
241 ) -> fmt::Result {
242 let redacted = ctx.config.redacted;
243 let mode = HumanizedExplain::new(redacted);
244
245 match self {
248 FastPathPlan::Constant(Ok(rows), _) => {
249 if !rows.is_empty() {
250 writeln!(f, "{}Constant", ctx.indent)?;
251 *ctx.as_mut() += 1;
252 fmt_text_constant_rows(
253 f,
254 rows.iter().map(|(row, diff)| (row, diff)),
255 ctx.as_mut(),
256 redacted,
257 )?;
258 *ctx.as_mut() -= 1;
259 } else {
260 writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
261 }
262 Ok(())
263 }
264 FastPathPlan::Constant(Err(err), _) => {
265 if redacted {
266 writeln!(f, "{}Error █", ctx.as_mut())
267 } else {
268 writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
269 }
270 }
271 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
272 ctx.as_mut().set();
273 let (map, filter, project) = mfp.as_map_filter_project();
274
275 let cols = if !ctx.config.humanized_exprs {
276 None
277 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
278 let cols = itertools::chain(
282 cols.iter().cloned(),
283 std::iter::repeat(String::new()).take(map.len()),
284 )
285 .collect();
286 Some(cols)
287 } else {
288 None
289 };
290
291 if project.len() != mfp.input_arity + map.len()
292 || !project.iter().enumerate().all(|(i, o)| i == *o)
293 {
294 let outputs = mode.seq(&project, cols.as_ref());
295 let outputs = CompactScalars(outputs);
296 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
297 *ctx.as_mut() += 1;
298 }
299 if !filter.is_empty() {
300 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
301 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
302 *ctx.as_mut() += 1;
303 }
304 if !map.is_empty() {
305 let scalars = mode.seq(&map, cols.as_ref());
306 let scalars = CompactScalars(scalars);
307 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
308 *ctx.as_mut() += 1;
309 }
310 MirRelationExpr::fmt_indexed_filter(
311 f,
312 ctx,
313 coll_id,
314 idx_id,
315 literal_constraints.clone(),
316 None,
317 )?;
318 writeln!(f)?;
319 ctx.as_mut().reset();
320 Ok(())
321 }
322 FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
323 ctx.as_mut().set();
324 let (map, filter, project) = mfp.as_map_filter_project();
325
326 let cols = if !ctx.config.humanized_exprs {
327 None
328 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
329 let cols = itertools::chain(
330 cols.iter().cloned(),
331 std::iter::repeat(String::new()).take(map.len()),
332 )
333 .collect::<Vec<_>>();
334 Some(cols)
335 } else {
336 None
337 };
338
339 if project.len() != mfp.input_arity + map.len()
340 || !project.iter().enumerate().all(|(i, o)| i == *o)
341 {
342 let outputs = mode.seq(&project, cols.as_ref());
343 let outputs = CompactScalars(outputs);
344 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
345 *ctx.as_mut() += 1;
346 }
347 if !filter.is_empty() {
348 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
349 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
350 *ctx.as_mut() += 1;
351 }
352 if !map.is_empty() {
353 let scalars = mode.seq(&map, cols.as_ref());
354 let scalars = CompactScalars(scalars);
355 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
356 *ctx.as_mut() += 1;
357 }
358 let human_id = ctx
359 .humanizer
360 .humanize_id(*gid)
361 .unwrap_or_else(|| gid.to_string());
362 write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
363 if let Some(literal) = literal_constraint {
364 let value = mode.expr(literal, None);
365 writeln!(f, " [value={}]", value)?;
366 } else {
367 writeln!(f, "")?;
368 }
369 ctx.as_mut().reset();
370 Ok(())
371 }
372 }?;
373 Ok(())
374 }
375}
376
377#[derive(Debug)]
378pub struct PlannedPeek {
379 pub plan: PeekPlan,
380 pub determination: TimestampDetermination<mz_repr::Timestamp>,
381 pub conn_id: ConnectionId,
382 pub intermediate_result_type: SqlRelationType,
389 pub source_arity: usize,
390 pub source_ids: BTreeSet<GlobalId>,
391}
392
393#[derive(Clone, Debug)]
395pub enum PeekPlan<T = mz_repr::Timestamp> {
396 FastPath(FastPathPlan),
397 SlowPath(PeekDataflowPlan<T>),
399}
400
401fn mfp_to_safe_plan(
406 mfp: mz_expr::MapFilterProject,
407) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
408 mfp.into_plan()
409 .map_err(OptimizerError::InternalUnsafeMfpPlan)?
410 .into_nontemporal()
411 .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
412}
413
414fn permute_oneshot_mfp_around_index(
416 mfp: mz_expr::MapFilterProject,
417 key: &[MirScalarExpr],
418) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
419 let input_arity = mfp.input_arity;
420 let mut safe_mfp = mfp_to_safe_plan(mfp)?;
421 let (permute, thinning) = permutation_for_arrangement(key, input_arity);
422 safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
423 Ok(safe_mfp)
424}
425
426pub fn create_fast_path_plan<T: Timestamp>(
432 dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
433 view_id: GlobalId,
434 finishing: Option<&RowSetFinishing>,
435 persist_fast_path_limit: usize,
436 persist_fast_path_order: bool,
437) -> Result<Option<FastPathPlan>, OptimizerError> {
438 if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
444 {
445 let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
446 if let Some((rows, found_typ)) = mir.as_const() {
447 let plan = FastPathPlan::Constant(
449 rows.clone(),
450 mz_repr::SqlRelationType::from_repr(found_typ),
451 );
452 return Ok(Some(plan));
453 } else {
454 if let MirRelationExpr::TopK {
457 input,
458 group_key,
459 order_key,
460 limit,
461 offset,
462 monotonic: _,
463 expected_group_size: _,
464 } = mir
465 {
466 if let Some(finishing) = finishing {
467 if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
468 let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
470 (None, _) => true,
471 (Some(..), None) => false,
472 (Some(topk_limit), Some(finishing_limit)) => {
473 if let Some(l) = topk_limit.as_literal_int64() {
474 l >= *finishing_limit
475 } else {
476 false
477 }
478 }
479 };
480 if finishing_limits_at_least_as_topk {
481 mir = input;
482 }
483 }
484 }
485 }
486 let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
490 match mir {
491 MirRelationExpr::Get {
492 id: Id::Global(get_id),
493 typ: repr_typ,
494 ..
495 } => {
496 for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
498 if desc.on_id == *get_id {
499 return Ok(Some(FastPathPlan::PeekExisting(
500 *get_id,
501 *index_id,
502 None,
503 permute_oneshot_mfp_around_index(mfp, &desc.key)?,
504 )));
505 }
506 }
507
508 let safe_mfp = mfp_to_safe_plan(mfp)?;
512 let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
513
514 let persist_fast_path_order_relation_typ = if persist_fast_path_order {
515 Some(
516 dataflow_plan
517 .source_imports
518 .get(get_id)
519 .expect("Get's ID is also imported")
520 .desc
521 .typ
522 .clone(),
523 )
524 } else {
525 None
526 };
527
528 let literal_constraint =
529 if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
530 let mut row = Row::default();
531 let mut packer = row.packer();
532 for (idx, col) in relation_typ.column_types.iter().enumerate() {
533 if !preserves_order(&col.scalar_type) {
534 break;
535 }
536 let col_expr = MirScalarExpr::column(idx);
537
538 let Some((literal, _)) = filters
539 .iter()
540 .filter_map(|f| f.expr_eq_literal(&col_expr))
541 .next()
542 else {
543 break;
544 };
545 packer.extend_by_row(&literal);
546 }
547 if row.is_empty() { None } else { Some(row) }
548 } else {
549 None
550 };
551
552 let finish_ok = match &finishing {
553 None => false,
554 Some(RowSetFinishing {
555 order_by,
556 limit,
557 offset,
558 ..
559 }) => {
560 let order_ok =
561 if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
562 order_by.iter().enumerate().all(|(idx, order)| {
563 let column_idx = projection[order.column];
566 if column_idx >= safe_mfp.input_arity {
567 return false;
568 }
569 let column_type = &relation_typ.column_types[column_idx];
570 let index_ok = idx == column_idx;
571 let nulls_ok = !column_type.nullable || order.nulls_last;
572 let asc_ok = !order.desc;
573 let type_ok = preserves_order(&column_type.scalar_type);
574 index_ok && nulls_ok && asc_ok && type_ok
575 })
576 } else {
577 order_by.is_empty()
578 };
579 let limit_ok = limit.map_or(false, |l| {
580 usize::cast_from(l) + *offset < persist_fast_path_limit
581 });
582 order_ok && limit_ok
583 }
584 };
585
586 let key_constraint = if let Some(literal) = &literal_constraint {
587 let prefix_len = literal.iter().count();
588 repr_typ
589 .keys
590 .iter()
591 .any(|k| k.iter().all(|idx| *idx < prefix_len))
592 } else {
593 false
594 };
595
596 if key_constraint || (filters.is_empty() && finish_ok) {
600 return Ok(Some(FastPathPlan::PeekPersist(
601 *get_id,
602 literal_constraint,
603 safe_mfp,
604 )));
605 }
606 }
607 MirRelationExpr::Join { implementation, .. } => {
608 if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
609 implementation
610 {
611 return Ok(Some(FastPathPlan::PeekExisting(
612 *coll_id,
613 *idx_id,
614 Some(vals.clone()),
615 permute_oneshot_mfp_around_index(mfp, key)?,
616 )));
617 }
618 }
619 _ => {}
621 }
622 }
623 }
624 Ok(None)
625}
626
627impl FastPathPlan {
628 pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
629 match self {
630 FastPathPlan::Constant(..) => UsedIndexes::default(),
631 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
632 if literal_constraints.is_some() {
633 UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
634 } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
635 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
636 } else {
637 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
638 }
639 }
640 FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
641 }
642 }
643}
644
645impl crate::coord::Coordinator {
646 #[mz_ore::instrument(level = "debug")]
648 pub async fn implement_peek_plan(
649 &mut self,
650 ctx_extra: &mut ExecuteContextGuard,
651 plan: PlannedPeek,
652 finishing: RowSetFinishing,
653 compute_instance: ComputeInstanceId,
654 target_replica: Option<ReplicaId>,
655 max_result_size: u64,
656 max_returned_query_size: Option<u64>,
657 ) -> Result<ExecuteResponse, AdapterError> {
658 let PlannedPeek {
659 plan: fast_path,
660 determination,
661 conn_id,
662 intermediate_result_type,
663 source_arity,
664 source_ids,
665 } = plan;
666
667 if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
669 let mut rows = match rows {
670 Ok(rows) => rows,
671 Err(e) => return Err(e.into()),
672 };
673 consolidate(&mut rows);
675
676 let mut results = Vec::new();
677 for (row, count) in rows {
678 if count.is_negative() {
679 Err(EvalError::InvalidParameterValue(
680 format!("Negative multiplicity in constant result: {}", count).into(),
681 ))?
682 };
683 if count.is_positive() {
684 let count = usize::cast_from(
685 u64::try_from(count.into_inner())
686 .expect("known to be positive from check above"),
687 );
688 results.push((
689 row,
690 NonZeroUsize::new(count).expect("known to be non-zero from check above"),
691 ));
692 }
693 }
694 let row_collection = RowCollection::new(results, &finishing.order_by);
695 let duration_histogram = self.metrics.row_set_finishing_seconds();
696
697 let (ret, reason) = match finishing.finish(
698 row_collection,
699 max_result_size,
700 max_returned_query_size,
701 &duration_histogram,
702 ) {
703 Ok((rows, row_size_bytes)) => {
704 let result_size = u64::cast_from(row_size_bytes);
705 let rows_returned = u64::cast_from(rows.count());
706 (
707 Ok(Self::send_immediate_rows(rows)),
708 StatementEndedExecutionReason::Success {
709 result_size: Some(result_size),
710 rows_returned: Some(rows_returned),
711 execution_strategy: Some(StatementExecutionStrategy::Constant),
712 },
713 )
714 }
715 Err(error) => (
716 Err(AdapterError::ResultSize(error.clone())),
717 StatementEndedExecutionReason::Errored { error },
718 ),
719 };
720 self.retire_execution(reason, std::mem::take(ctx_extra).defuse());
721 return ret;
722 }
723
724 let timestamp = determination.timestamp_context.timestamp_or_default();
725 if let Some(id) = ctx_extra.contents() {
726 self.set_statement_execution_timestamp(id, timestamp)
727 }
728
729 let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
736 PeekPlan::FastPath(FastPathPlan::PeekExisting(
737 _coll_id,
738 idx_id,
739 literal_constraints,
740 map_filter_project,
741 )) => (
742 (literal_constraints, timestamp, map_filter_project),
743 None,
744 true,
745 PeekTarget::Index { id: idx_id },
746 StatementExecutionStrategy::FastPath,
747 ),
748 PeekPlan::FastPath(FastPathPlan::PeekPersist(
749 coll_id,
750 literal_constraint,
751 map_filter_project,
752 )) => {
753 let peek_command = (
754 literal_constraint.map(|r| vec![r]),
755 timestamp,
756 map_filter_project,
757 );
758 let metadata = self
759 .controller
760 .storage
761 .collection_metadata(coll_id)
762 .expect("storage collection for fast-path peek")
763 .clone();
764 (
765 peek_command,
766 None,
767 true,
768 PeekTarget::Persist {
769 id: coll_id,
770 metadata,
771 },
772 StatementExecutionStrategy::PersistFastPath,
773 )
774 }
775 PeekPlan::SlowPath(PeekDataflowPlan {
776 desc: dataflow,
777 id: index_id,
781 key: index_key,
782 permutation: index_permutation,
783 thinned_arity: index_thinned_arity,
784 }) => {
785 let output_ids = dataflow.export_ids().collect();
786
787 self.controller
789 .compute
790 .create_dataflow(compute_instance, dataflow, None)
791 .map_err(
792 AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
793 )?;
794 self.initialize_compute_read_policies(
795 output_ids,
796 compute_instance,
797 CompactionWindow::DisableCompaction,
799 )
800 .await;
801
802 let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
804 map_filter_project.permute_fn(
805 |c| index_permutation[c],
806 index_key.len() + index_thinned_arity,
807 );
808 let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
809
810 (
811 (None, timestamp, map_filter_project),
812 Some(index_id),
813 false,
814 PeekTarget::Index { id: index_id },
815 StatementExecutionStrategy::Standard,
816 )
817 }
818 _ => {
819 unreachable!()
820 }
821 };
822
823 let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
825
826 let mut uuid = Uuid::new_v4();
829 while self.pending_peeks.contains_key(&uuid) {
830 uuid = Uuid::new_v4();
831 }
832
833 self.pending_peeks.insert(
836 uuid,
837 PendingPeek {
838 conn_id: conn_id.clone(),
839 cluster_id: compute_instance,
840 depends_on: source_ids,
841 ctx_extra: std::mem::take(ctx_extra),
842 is_fast_path,
843 },
844 );
845 self.client_pending_peeks
846 .entry(conn_id)
847 .or_default()
848 .insert(uuid, compute_instance);
849 let (literal_constraints, timestamp, map_filter_project) = peek_command;
850
851 let peek_result_column_names =
854 (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
855 let peek_result_desc =
856 RelationDesc::new(intermediate_result_type, peek_result_column_names);
857
858 self.controller
859 .compute
860 .peek(
861 compute_instance,
862 peek_target,
863 literal_constraints,
864 uuid,
865 timestamp,
866 peek_result_desc,
867 finishing.clone(),
868 map_filter_project,
869 target_replica,
870 rows_tx,
871 )
872 .map_err(AdapterError::concurrent_dependency_drop_from_peek_error)?;
873
874 let duration_histogram = self.metrics.row_set_finishing_seconds();
875
876 if let Some(index_id) = drop_dataflow {
878 self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
879 self.drop_compute_collections(vec![(compute_instance, index_id)]);
880 }
881
882 let persist_client = self.persist_client.clone();
883 let peek_stash_read_batch_size_bytes =
884 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
885 .get(self.catalog().system_config().dyncfgs());
886 let peek_stash_read_memory_budget_bytes =
887 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
888 .get(self.catalog().system_config().dyncfgs());
889
890 let peek_response_stream = Self::create_peek_response_stream(
891 rows_rx,
892 finishing,
893 max_result_size,
894 max_returned_query_size,
895 duration_histogram,
896 persist_client,
897 peek_stash_read_batch_size_bytes,
898 peek_stash_read_memory_budget_bytes,
899 );
900
901 Ok(crate::ExecuteResponse::SendingRowsStreaming {
902 rows: Box::pin(peek_response_stream),
903 instance_id: compute_instance,
904 strategy,
905 })
906 }
907
908 #[mz_ore::instrument(level = "debug")]
912 pub(crate) fn create_peek_response_stream(
913 rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
914 finishing: RowSetFinishing,
915 max_result_size: u64,
916 max_returned_query_size: Option<u64>,
917 duration_histogram: prometheus::Histogram,
918 mut persist_client: mz_persist_client::PersistClient,
919 peek_stash_read_batch_size_bytes: usize,
920 peek_stash_read_memory_budget_bytes: usize,
921 ) -> impl futures::Stream<Item = PeekResponseUnary> {
922 async_stream::stream!({
923 let result = rows_rx.await;
924
925 let rows = match result {
926 Ok(rows) => rows,
927 Err(e) => {
928 yield PeekResponseUnary::Error(e.to_string());
929 return;
930 }
931 };
932
933 match rows {
934 PeekResponse::Rows(rows) => {
935 let rows = RowCollection::merge_sorted(&rows, &finishing.order_by);
936 match finishing.finish(
937 rows,
938 max_result_size,
939 max_returned_query_size,
940 &duration_histogram,
941 ) {
942 Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
943 Err(e) => yield PeekResponseUnary::Error(e),
944 }
945 }
946 PeekResponse::Stashed(response) => {
947 let response = *response;
948
949 let shard_id = response.shard_id;
950
951 let mut batches = Vec::new();
952 for proto_batch in response.batches.into_iter() {
953 let batch =
954 persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
955
956 batches.push(batch);
957 }
958 tracing::trace!(?batches, "stashed peek response");
959
960 let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
961 let read_schemas: Schemas<SourceData, ()> = Schemas {
962 id: None,
963 key: Arc::new(response.relation_desc.clone()),
964 val: Arc::new(UnitSchema),
965 };
966
967 let mut row_cursor = persist_client
968 .read_batches_consolidated::<_, _, _, i64>(
969 response.shard_id,
970 as_of,
971 read_schemas,
972 batches,
973 |_stats| true,
974 peek_stash_read_memory_budget_bytes,
975 )
976 .await
977 .expect("invalid usage");
978
979 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
998 mz_ore::task::spawn(|| "read_peek_batches", async move {
999 for rows in response.inline_rows {
1009 let result = tx.send(rows).await;
1010 if result.is_err() {
1011 tracing::debug!("receiver went away");
1012 }
1013 }
1014
1015 let mut current_batch = Vec::new();
1016 let mut current_batch_size: usize = 0;
1017
1018 'outer: while let Some(rows) = row_cursor.next().await {
1019 for ((source_data, _val), _ts, diff) in rows {
1020 let row = source_data
1021 .0
1022 .expect("we are not sending errors on this code path");
1023
1024 let diff = usize::try_from(diff)
1025 .expect("peek responses cannot have negative diffs");
1026
1027 if diff > 0 {
1028 let diff =
1029 NonZeroUsize::new(diff).expect("checked to be non-zero");
1030 current_batch_size =
1031 current_batch_size.saturating_add(row.byte_len());
1032 current_batch.push((row, diff));
1033 }
1034
1035 if current_batch_size > peek_stash_read_batch_size_bytes {
1036 let result = tx
1042 .send(RowCollection::new(
1043 current_batch.drain(..).collect_vec(),
1044 &[],
1045 ))
1046 .await;
1047 if result.is_err() {
1048 tracing::debug!("receiver went away");
1049 break 'outer;
1052 }
1053
1054 current_batch_size = 0;
1055 }
1056 }
1057 }
1058
1059 if current_batch.len() > 0 {
1060 let result = tx.send(RowCollection::new(current_batch, &[])).await;
1061 if result.is_err() {
1062 tracing::debug!("receiver went away");
1063 }
1064 }
1065
1066 let batches = row_cursor.into_lease();
1067 tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1068 for batch in batches {
1069 batch.delete().await;
1070 }
1071 });
1072
1073 assert!(
1074 finishing.is_streamable(response.relation_desc.arity()),
1075 "can only get stashed responses when the finishing is streamable"
1076 );
1077
1078 tracing::trace!("query result is streamable!");
1079
1080 assert!(finishing.is_streamable(response.relation_desc.arity()));
1081 let mut incremental_finishing = RowSetFinishingIncremental::new(
1082 finishing.offset,
1083 finishing.limit,
1084 finishing.project,
1085 max_returned_query_size,
1086 );
1087
1088 let mut got_zero_rows = true;
1089 while let Some(rows) = rx.recv().await {
1090 got_zero_rows = false;
1091
1092 let result_rows = incremental_finishing.finish_incremental(
1093 rows,
1094 max_result_size,
1095 &duration_histogram,
1096 );
1097
1098 match result_rows {
1099 Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1100 Err(e) => yield PeekResponseUnary::Error(e),
1101 }
1102 }
1103
1104 if got_zero_rows {
1107 let row_iter = vec![].into_row_iter();
1108 yield PeekResponseUnary::Rows(Box::new(row_iter));
1109 }
1110 }
1111 PeekResponse::Canceled => {
1112 yield PeekResponseUnary::Canceled;
1113 }
1114 PeekResponse::Error(e) => {
1115 yield PeekResponseUnary::Error(e);
1116 }
1117 }
1118 })
1119 }
1120
1121 #[mz_ore::instrument(level = "debug")]
1123 pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1124 if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1125 self.metrics
1126 .canceled_peeks
1127 .inc_by(u64::cast_from(uuids.len()));
1128
1129 let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1130 for (uuid, compute_instance) in &uuids {
1131 inverse.entry(*compute_instance).or_default().insert(*uuid);
1132 }
1133 for (compute_instance, uuids) in inverse {
1134 for uuid in uuids {
1139 let _ = self.controller.compute.cancel_peek(
1140 compute_instance,
1141 uuid,
1142 PeekResponse::Canceled,
1143 );
1144 }
1145 }
1146
1147 let peeks = uuids
1148 .iter()
1149 .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1150 .collect::<Vec<_>>();
1151 for peek in peeks {
1152 self.retire_execution(
1153 StatementEndedExecutionReason::Canceled,
1154 peek.ctx_extra.defuse(),
1155 );
1156 }
1157 }
1158 }
1159
1160 pub(crate) fn handle_peek_notification(
1163 &mut self,
1164 uuid: Uuid,
1165 notification: PeekNotification,
1166 otel_ctx: OpenTelemetryContext,
1167 ) {
1168 if let Some(PendingPeek {
1171 conn_id: _,
1172 cluster_id: _,
1173 depends_on: _,
1174 ctx_extra,
1175 is_fast_path,
1176 }) = self.remove_pending_peek(&uuid)
1177 {
1178 let reason = match notification {
1179 PeekNotification::Success {
1180 rows: num_rows,
1181 result_size,
1182 } => {
1183 let strategy = if is_fast_path {
1184 StatementExecutionStrategy::FastPath
1185 } else {
1186 StatementExecutionStrategy::Standard
1187 };
1188 StatementEndedExecutionReason::Success {
1189 result_size: Some(result_size),
1190 rows_returned: Some(num_rows),
1191 execution_strategy: Some(strategy),
1192 }
1193 }
1194 PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1195 PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1196 };
1197 otel_ctx.attach_as_parent();
1198 self.retire_execution(reason, ctx_extra.defuse());
1199 }
1200 }
1203
1204 pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1206 let pending_peek = self.pending_peeks.remove(uuid);
1207 if let Some(pending_peek) = &pending_peek {
1208 let uuids = self
1209 .client_pending_peeks
1210 .get_mut(&pending_peek.conn_id)
1211 .expect("coord peek state is inconsistent");
1212 uuids.remove(uuid);
1213 if uuids.is_empty() {
1214 self.client_pending_peeks.remove(&pending_peek.conn_id);
1215 }
1216 }
1217 pending_peek
1218 }
1219
1220 pub(crate) async fn implement_slow_path_peek(
1226 &mut self,
1227 dataflow_plan: PeekDataflowPlan<mz_repr::Timestamp>,
1228 determination: TimestampDetermination<mz_repr::Timestamp>,
1229 finishing: RowSetFinishing,
1230 compute_instance: ComputeInstanceId,
1231 target_replica: Option<ReplicaId>,
1232 intermediate_result_type: SqlRelationType,
1233 source_ids: BTreeSet<GlobalId>,
1234 conn_id: ConnectionId,
1235 max_result_size: u64,
1236 max_query_result_size: Option<u64>,
1237 watch_set: Option<WatchSetCreation>,
1238 ) -> Result<ExecuteResponse, AdapterError> {
1239 let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1244 if let Some(ws) = watch_set {
1245 self.install_peek_watch_sets(conn_id.clone(), ws)
1246 .map_err(|e| {
1247 AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e)
1248 })?;
1249 }
1250
1251 let source_arity = intermediate_result_type.arity();
1252
1253 let planned_peek = PlannedPeek {
1254 plan: PeekPlan::SlowPath(dataflow_plan),
1255 determination,
1256 conn_id,
1257 intermediate_result_type,
1258 source_arity,
1259 source_ids,
1260 };
1261
1262 self.implement_peek_plan(
1267 &mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
1268 planned_peek,
1269 finishing,
1270 compute_instance,
1271 target_replica,
1272 max_result_size,
1273 max_query_result_size,
1274 )
1275 .await
1276 }
1277
1278 pub(crate) async fn implement_copy_to(
1291 &mut self,
1292 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1293 compute_instance: ComputeInstanceId,
1294 target_replica: Option<ReplicaId>,
1295 source_ids: BTreeSet<GlobalId>,
1296 conn_id: ConnectionId,
1297 watch_set: Option<WatchSetCreation>,
1298 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1299 ) {
1300 let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1302 e: AdapterError| {
1303 let _ = tx.send(Err(e));
1304 };
1305
1306 if let Some(ws) = watch_set {
1310 if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1311 let err = AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e);
1312 send_err(tx, err);
1313 return;
1314 }
1315 }
1316
1317 let sink_id = df_desc.sink_id();
1321
1322 let (sink_tx, sink_rx) = oneshot::channel();
1326 let active_copy_to = ActiveCopyTo {
1327 conn_id: conn_id.clone(),
1328 tx: sink_tx,
1329 cluster_id: compute_instance,
1330 depends_on: source_ids,
1331 };
1332
1333 drop(
1335 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1336 .await,
1337 );
1338
1339 if let Err(e) = self
1342 .try_ship_dataflow(df_desc, compute_instance, target_replica)
1343 .await
1344 .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1345 {
1346 self.remove_active_compute_sink(sink_id).await;
1350 send_err(tx, e);
1351 return;
1352 }
1353
1354 let span = Span::current();
1359 task::spawn(
1360 || "copy to completion",
1361 async move {
1362 let res = sink_rx.await;
1363 let result = match res {
1364 Ok(res) => res,
1365 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1366 };
1367
1368 let _ = tx.send(result);
1369 }
1370 .instrument(span),
1371 );
1372 }
1373
1374 pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1378 where
1379 I: IntoRowIterator,
1380 I::Iter: Send + Sync + 'static,
1381 {
1382 let rows = Box::new(rows.into_row_iter());
1383 ExecuteResponse::SendingRowsImmediate { rows }
1384 }
1385}
1386
1387#[cfg(test)]
1388mod tests {
1389 use mz_expr::func::IsNull;
1390 use mz_expr::{MapFilterProject, UnaryFunc};
1391 use mz_ore::str::Indent;
1392 use mz_repr::explain::text::text_string_at;
1393 use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1394 use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1395
1396 use super::*;
1397
1398 #[mz_ore::test]
1399 #[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
1401 let typ = SqlRelationType::new(vec![SqlColumnType {
1402 scalar_type: SqlScalarType::String,
1403 nullable: false,
1404 }]);
1405 let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1406 let no_lookup = FastPathPlan::PeekExisting(
1407 GlobalId::User(8),
1408 GlobalId::User(10),
1409 None,
1410 MapFilterProject::new(4)
1411 .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1412 .project([1, 4])
1413 .into_plan()
1414 .expect("invalid plan")
1415 .into_nontemporal()
1416 .expect("invalid nontemporal"),
1417 );
1418 let lookup = FastPathPlan::PeekExisting(
1419 GlobalId::User(9),
1420 GlobalId::User(11),
1421 Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1422 MapFilterProject::new(3)
1423 .filter(Some(
1424 MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1425 ))
1426 .into_plan()
1427 .expect("invalid plan")
1428 .into_nontemporal()
1429 .expect("invalid nontemporal"),
1430 );
1431
1432 let humanizer = DummyHumanizer;
1433 let config = ExplainConfig {
1434 redacted: false,
1435 verbose_syntax: true,
1436 ..Default::default()
1437 };
1438 let ctx_gen = || {
1439 let indent = Indent::default();
1440 let annotations = BTreeMap::new();
1441 PlanRenderingContext::<FastPathPlan>::new(
1442 indent,
1443 &humanizer,
1444 annotations,
1445 &config,
1446 BTreeSet::default(),
1447 )
1448 };
1449
1450 let constant_err_exp = "Error \"division by zero\"\n";
1451 let no_lookup_exp = "Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1452 let lookup_exp =
1453 "Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1454
1455 assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1456 assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1457 assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1458
1459 let mut constant_rows = vec![
1460 (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1461 (Row::pack(Some(Datum::String("world"))), 2.into()),
1462 (Row::pack(Some(Datum::String("star"))), 500.into()),
1463 ];
1464 let constant_exp1 =
1465 "Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
1466 assert_eq!(
1467 text_string_at(
1468 &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1469 ctx_gen
1470 ),
1471 constant_exp1
1472 );
1473 constant_rows
1474 .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1475 let constant_exp2 = "Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
1476 \n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
1477 \n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
1478 \n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
1479 \n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
1480 assert_eq!(
1481 text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1482 constant_exp2
1483 );
1484 }
1485}