1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_controller_types::ClusterId;
32use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
33use mz_expr::row::RowCollection;
34use mz_expr::{
35 EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
36 RowSetFinishingIncremental, permutation_for_arrangement,
37};
38use mz_ore::cast::CastFrom;
39use mz_ore::str::{StrExt, separated};
40use mz_ore::task;
41use mz_ore::tracing::OpenTelemetryContext;
42use mz_persist_client::Schemas;
43use mz_persist_types::codec_impls::UnitSchema;
44use mz_repr::explain::text::DisplayText;
45use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
46use mz_repr::{
47 Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
48 preserves_order,
49};
50use mz_storage_types::sources::SourceData;
51use serde::{Deserialize, Serialize};
52use timely::progress::Antichain;
53use tokio::sync::oneshot;
54use tracing::{Instrument, Span};
55use uuid::Uuid;
56
57use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
58use crate::coord::timestamp_selection::TimestampDetermination;
59use crate::optimize::OptimizerError;
60use crate::statement_logging::WatchSetCreation;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
62use crate::{AdapterError, ExecuteContextGuard, ExecuteResponse};
63
64#[derive(Debug)]
66pub(crate) struct PendingPeek {
67 pub(crate) conn_id: ConnectionId,
69 pub(crate) cluster_id: ClusterId,
71 pub(crate) depends_on: BTreeSet<GlobalId>,
73 pub(crate) ctx_extra: ExecuteContextGuard,
76 pub(crate) is_fast_path: bool,
78}
79
80#[derive(Debug)]
85pub enum PeekResponseUnary {
86 Rows(Box<dyn RowIterator + Send + Sync>),
87 Error(String),
88 Canceled,
89}
90
91#[derive(Clone, Debug)]
92pub struct PeekDataflowPlan {
93 pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan, ()>,
94 pub(crate) id: GlobalId,
95 key: Vec<MirScalarExpr>,
96 permutation: Vec<usize>,
97 thinned_arity: usize,
98}
99
100impl PeekDataflowPlan {
101 pub fn new(
102 desc: DataflowDescription<mz_compute_types::plan::Plan, ()>,
103 id: GlobalId,
104 typ: &SqlRelationType,
105 ) -> Self {
106 let arity = typ.arity();
107 let key = typ
108 .default_key()
109 .into_iter()
110 .map(MirScalarExpr::column)
111 .collect::<Vec<_>>();
112 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
113 Self {
114 desc,
115 id,
116 key,
117 permutation,
118 thinned_arity: thinning.len(),
119 }
120 }
121}
122
123#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
124pub enum FastPathPlan {
125 Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
130 PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
133 PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
135}
136
137impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
138 fn fmt_text(
139 &self,
140 f: &mut fmt::Formatter<'_>,
141 ctx: &mut PlanRenderingContext<'a, T>,
142 ) -> fmt::Result {
143 if ctx.config.verbose_syntax {
144 self.fmt_verbose_text(f, ctx)
145 } else {
146 self.fmt_default_text(f, ctx)
147 }
148 }
149}
150
151impl FastPathPlan {
152 pub fn fmt_default_text<'a, T>(
153 &self,
154 f: &mut fmt::Formatter<'_>,
155 ctx: &mut PlanRenderingContext<'a, T>,
156 ) -> fmt::Result {
157 let mode = HumanizedExplain::new(ctx.config.redacted);
158
159 match self {
160 FastPathPlan::Constant(rows, _) => {
161 write!(f, "{}→Constant ", ctx.indent)?;
162
163 match rows {
164 Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
165 Err(err) => {
166 if mode.redacted() {
167 writeln!(f, "(error: █)")?;
168 } else {
169 writeln!(f, "(error: {})", err.to_string().quoted(),)?;
170 }
171 }
172 }
173 }
174 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
175 let coll = ctx
176 .humanizer
177 .humanize_id(*coll_id)
178 .unwrap_or_else(|| coll_id.to_string());
179 let idx = ctx
180 .humanizer
181 .humanize_id(*idx_id)
182 .unwrap_or_else(|| idx_id.to_string());
183 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
184 ctx.indent.set();
185
186 ctx.indent += 1;
187
188 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
189 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
190
191 if printed {
192 ctx.indent += 1;
193 }
194 if let Some(literal_constraints) = literal_constraints {
195 writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
196 ctx.indent += 1;
197 let values = separated("; ", mode.seq(literal_constraints, None));
198 writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
199 } else {
200 writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
201 }
202
203 ctx.indent.reset();
204 }
205 FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
206 let coll = ctx
207 .humanizer
208 .humanize_id(*global_id)
209 .unwrap_or_else(|| global_id.to_string());
210 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
211 ctx.indent.set();
212
213 ctx.indent += 1;
214
215 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
216 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
217
218 if printed {
219 ctx.indent += 1;
220 }
221 if let Some(literal_constraint) = literal_constraint {
222 writeln!(f, "{}→ReadStorage Lookup on {coll}", ctx.indent)?;
223 ctx.indent += 1;
224 let value = mode.expr(literal_constraint, None);
225 writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
226 } else {
227 writeln!(f, "{}→ReadStorage {coll}", ctx.indent)?;
228 }
229
230 ctx.indent.reset();
231 }
232 }
233
234 Ok(())
235 }
236
237 pub fn fmt_verbose_text<'a, T>(
238 &self,
239 f: &mut fmt::Formatter<'_>,
240 ctx: &mut PlanRenderingContext<'a, T>,
241 ) -> fmt::Result {
242 let redacted = ctx.config.redacted;
243 let mode = HumanizedExplain::new(redacted);
244
245 match self {
248 FastPathPlan::Constant(Ok(rows), _) => {
249 if !rows.is_empty() {
250 writeln!(f, "{}Constant", ctx.indent)?;
251 *ctx.as_mut() += 1;
252 fmt_text_constant_rows(
253 f,
254 rows.iter().map(|(row, diff)| (row, diff)),
255 ctx.as_mut(),
256 redacted,
257 )?;
258 *ctx.as_mut() -= 1;
259 } else {
260 writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
261 }
262 Ok(())
263 }
264 FastPathPlan::Constant(Err(err), _) => {
265 if redacted {
266 writeln!(f, "{}Error █", ctx.as_mut())
267 } else {
268 writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
269 }
270 }
271 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
272 ctx.as_mut().set();
273 let (map, filter, project) = mfp.as_map_filter_project();
274
275 let cols = if !ctx.config.humanized_exprs {
276 None
277 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
278 let cols = itertools::chain(
282 cols.iter().cloned(),
283 std::iter::repeat(String::new()).take(map.len()),
284 )
285 .collect();
286 Some(cols)
287 } else {
288 None
289 };
290
291 if project.len() != mfp.input_arity + map.len()
292 || !project.iter().enumerate().all(|(i, o)| i == *o)
293 {
294 let outputs = mode.seq(&project, cols.as_ref());
295 let outputs = CompactScalars(outputs);
296 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
297 *ctx.as_mut() += 1;
298 }
299 if !filter.is_empty() {
300 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
301 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
302 *ctx.as_mut() += 1;
303 }
304 if !map.is_empty() {
305 let scalars = mode.seq(&map, cols.as_ref());
306 let scalars = CompactScalars(scalars);
307 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
308 *ctx.as_mut() += 1;
309 }
310 MirRelationExpr::fmt_indexed_filter(
311 f,
312 ctx,
313 coll_id,
314 idx_id,
315 literal_constraints.clone(),
316 None,
317 )?;
318 writeln!(f)?;
319 ctx.as_mut().reset();
320 Ok(())
321 }
322 FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
323 ctx.as_mut().set();
324 let (map, filter, project) = mfp.as_map_filter_project();
325
326 let cols = if !ctx.config.humanized_exprs {
327 None
328 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
329 let cols = itertools::chain(
330 cols.iter().cloned(),
331 std::iter::repeat(String::new()).take(map.len()),
332 )
333 .collect::<Vec<_>>();
334 Some(cols)
335 } else {
336 None
337 };
338
339 if project.len() != mfp.input_arity + map.len()
340 || !project.iter().enumerate().all(|(i, o)| i == *o)
341 {
342 let outputs = mode.seq(&project, cols.as_ref());
343 let outputs = CompactScalars(outputs);
344 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
345 *ctx.as_mut() += 1;
346 }
347 if !filter.is_empty() {
348 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
349 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
350 *ctx.as_mut() += 1;
351 }
352 if !map.is_empty() {
353 let scalars = mode.seq(&map, cols.as_ref());
354 let scalars = CompactScalars(scalars);
355 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
356 *ctx.as_mut() += 1;
357 }
358 let human_id = ctx
359 .humanizer
360 .humanize_id(*gid)
361 .unwrap_or_else(|| gid.to_string());
362 write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
363 if let Some(literal) = literal_constraint {
364 let value = mode.expr(literal, None);
365 writeln!(f, " [value={}]", value)?;
366 } else {
367 writeln!(f, "")?;
368 }
369 ctx.as_mut().reset();
370 Ok(())
371 }
372 }?;
373 Ok(())
374 }
375}
376
377#[derive(Debug)]
378pub struct PlannedPeek {
379 pub plan: PeekPlan,
380 pub determination: TimestampDetermination,
381 pub conn_id: ConnectionId,
382 pub intermediate_result_type: SqlRelationType,
389 pub source_arity: usize,
390 pub source_ids: BTreeSet<GlobalId>,
391}
392
393#[derive(Clone, Debug)]
395pub enum PeekPlan {
396 FastPath(FastPathPlan),
397 SlowPath(PeekDataflowPlan),
399}
400
401fn mfp_to_safe_plan(
406 mfp: mz_expr::MapFilterProject,
407) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
408 mfp.into_plan()
409 .map_err(OptimizerError::InternalUnsafeMfpPlan)?
410 .into_nontemporal()
411 .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
412}
413
414fn permute_oneshot_mfp_around_index(
416 mfp: mz_expr::MapFilterProject,
417 key: &[MirScalarExpr],
418) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
419 let input_arity = mfp.input_arity;
420 let mut safe_mfp = mfp_to_safe_plan(mfp)?;
421 let (permute, thinning) = permutation_for_arrangement(key, input_arity);
422 safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
423 Ok(safe_mfp)
424}
425
426pub fn create_fast_path_plan(
432 dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr>,
433 view_id: GlobalId,
434 finishing: Option<&RowSetFinishing>,
435 persist_fast_path_limit: usize,
436 persist_fast_path_order: bool,
437) -> Result<Option<FastPathPlan>, OptimizerError> {
438 if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
444 {
445 let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
446 if let Some((rows, found_typ)) = mir.as_const() {
447 let plan = FastPathPlan::Constant(
449 rows.clone(),
450 mz_repr::SqlRelationType::from_repr(found_typ),
451 );
452 return Ok(Some(plan));
453 } else {
454 if let MirRelationExpr::TopK {
457 input,
458 group_key,
459 order_key,
460 limit,
461 offset,
462 monotonic: _,
463 expected_group_size: _,
464 } = mir
465 {
466 if let Some(finishing) = finishing {
467 if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
468 let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
470 (None, _) => true,
471 (Some(..), None) => false,
472 (Some(topk_limit), Some(finishing_limit)) => {
473 if let Some(l) = topk_limit.as_literal_int64() {
474 l >= *finishing_limit
475 } else {
476 false
477 }
478 }
479 };
480 if finishing_limits_at_least_as_topk {
481 mir = input;
482 }
483 }
484 }
485 }
486 let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
490 match mir {
491 MirRelationExpr::Get {
492 id: Id::Global(get_id),
493 typ: repr_typ,
494 ..
495 } => {
496 for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
498 if desc.on_id == *get_id {
499 return Ok(Some(FastPathPlan::PeekExisting(
500 *get_id,
501 *index_id,
502 None,
503 permute_oneshot_mfp_around_index(mfp, &desc.key)?,
504 )));
505 }
506 }
507
508 let safe_mfp = mfp_to_safe_plan(mfp)?;
512 let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
513
514 let persist_fast_path_order_relation_typ = if persist_fast_path_order {
515 Some(
516 dataflow_plan
517 .source_imports
518 .get(get_id)
519 .expect("Get's ID is also imported")
520 .desc
521 .typ
522 .clone(),
523 )
524 } else {
525 None
526 };
527
528 let literal_constraint =
529 if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
530 let mut row = Row::default();
531 let mut packer = row.packer();
532 for (idx, col) in relation_typ.column_types.iter().enumerate() {
533 if !preserves_order(&col.scalar_type) {
534 break;
535 }
536 let col_expr = MirScalarExpr::column(idx);
537
538 let Some((literal, _)) = filters
539 .iter()
540 .filter_map(|f| f.expr_eq_literal(&col_expr))
541 .next()
542 else {
543 break;
544 };
545 packer.extend_by_row(&literal);
546 }
547 if row.is_empty() { None } else { Some(row) }
548 } else {
549 None
550 };
551
552 let finish_ok = match &finishing {
553 None => false,
554 Some(RowSetFinishing {
555 order_by,
556 limit,
557 offset,
558 ..
559 }) => {
560 let order_ok =
561 if let Some(relation_typ) = &persist_fast_path_order_relation_typ {
562 order_by.iter().enumerate().all(|(idx, order)| {
563 let column_idx = projection[order.column];
566 if column_idx >= safe_mfp.input_arity {
567 return false;
568 }
569 let column_type = &relation_typ.column_types[column_idx];
570 let index_ok = idx == column_idx;
571 let nulls_ok = !column_type.nullable || order.nulls_last;
572 let asc_ok = !order.desc;
573 let type_ok = preserves_order(&column_type.scalar_type);
574 index_ok && nulls_ok && asc_ok && type_ok
575 })
576 } else {
577 order_by.is_empty()
578 };
579 let limit_ok = limit.map_or(false, |l| {
580 usize::cast_from(l) + *offset < persist_fast_path_limit
581 });
582 order_ok && limit_ok
583 }
584 };
585
586 let key_constraint = if let Some(literal) = &literal_constraint {
587 let prefix_len = literal.iter().count();
588 repr_typ
589 .keys
590 .iter()
591 .any(|k| k.iter().all(|idx| *idx < prefix_len))
592 } else {
593 false
594 };
595
596 if key_constraint || (filters.is_empty() && finish_ok) {
600 return Ok(Some(FastPathPlan::PeekPersist(
601 *get_id,
602 literal_constraint,
603 safe_mfp,
604 )));
605 }
606 }
607 MirRelationExpr::Join { implementation, .. } => {
608 if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
609 implementation
610 {
611 return Ok(Some(FastPathPlan::PeekExisting(
612 *coll_id,
613 *idx_id,
614 Some(vals.clone()),
615 permute_oneshot_mfp_around_index(mfp, key)?,
616 )));
617 }
618 }
619 _ => {}
621 }
622 }
623 }
624 Ok(None)
625}
626
627impl FastPathPlan {
628 pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
629 match self {
630 FastPathPlan::Constant(..) => UsedIndexes::default(),
631 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
632 if literal_constraints.is_some() {
633 UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
634 } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
635 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
636 } else {
637 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
638 }
639 }
640 FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
641 }
642 }
643}
644
645impl crate::coord::Coordinator {
646 #[mz_ore::instrument(level = "debug")]
648 pub async fn implement_peek_plan(
649 &mut self,
650 ctx_extra: &mut ExecuteContextGuard,
651 plan: PlannedPeek,
652 finishing: RowSetFinishing,
653 compute_instance: ComputeInstanceId,
654 target_replica: Option<ReplicaId>,
655 max_result_size: u64,
656 max_returned_query_size: Option<u64>,
657 ) -> Result<ExecuteResponse, AdapterError> {
658 let PlannedPeek {
659 plan: fast_path,
660 determination,
661 conn_id,
662 intermediate_result_type,
663 source_arity,
664 source_ids,
665 } = plan;
666
667 if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
669 let mut rows = match rows {
670 Ok(rows) => rows,
671 Err(e) => return Err(e.into()),
672 };
673 consolidate(&mut rows);
675
676 let mut results = Vec::new();
677 for (row, count) in rows {
678 if count.is_negative() {
679 Err(EvalError::InvalidParameterValue(
680 format!("Negative multiplicity in constant result: {}", count).into(),
681 ))?
682 };
683 if count.is_positive() {
684 let count = usize::cast_from(
685 u64::try_from(count.into_inner())
686 .expect("known to be positive from check above"),
687 );
688 results.push((
689 row,
690 NonZeroUsize::new(count).expect("known to be non-zero from check above"),
691 ));
692 }
693 }
694 let row_collection = RowCollection::new(results, &finishing.order_by);
695 let duration_histogram = self.metrics.row_set_finishing_seconds();
696
697 let (ret, reason) = match finishing.finish(
698 row_collection,
699 max_result_size,
700 max_returned_query_size,
701 &duration_histogram,
702 ) {
703 Ok((rows, row_size_bytes)) => {
704 let result_size = u64::cast_from(row_size_bytes);
705 let rows_returned = u64::cast_from(rows.count());
706 (
707 Ok(Self::send_immediate_rows(rows)),
708 StatementEndedExecutionReason::Success {
709 result_size: Some(result_size),
710 rows_returned: Some(rows_returned),
711 execution_strategy: Some(StatementExecutionStrategy::Constant),
712 },
713 )
714 }
715 Err(error) => (
716 Err(AdapterError::ResultSize(error.clone())),
717 StatementEndedExecutionReason::Errored { error },
718 ),
719 };
720 self.retire_execution(reason, std::mem::take(ctx_extra).defuse());
721 return ret;
722 }
723
724 let timestamp = determination.timestamp_context.timestamp_or_default();
725 if let Some(id) = ctx_extra.contents() {
726 self.set_statement_execution_timestamp(id, timestamp)
727 }
728
729 let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
736 PeekPlan::FastPath(FastPathPlan::PeekExisting(
737 _coll_id,
738 idx_id,
739 literal_constraints,
740 map_filter_project,
741 )) => (
742 (literal_constraints, timestamp, map_filter_project),
743 None,
744 true,
745 PeekTarget::Index { id: idx_id },
746 StatementExecutionStrategy::FastPath,
747 ),
748 PeekPlan::FastPath(FastPathPlan::PeekPersist(
749 coll_id,
750 literal_constraint,
751 map_filter_project,
752 )) => {
753 let peek_command = (
754 literal_constraint.map(|r| vec![r]),
755 timestamp,
756 map_filter_project,
757 );
758 let metadata = self
759 .controller
760 .storage
761 .collection_metadata(coll_id)
762 .expect("storage collection for fast-path peek")
763 .clone();
764 (
765 peek_command,
766 None,
767 true,
768 PeekTarget::Persist {
769 id: coll_id,
770 metadata,
771 },
772 StatementExecutionStrategy::PersistFastPath,
773 )
774 }
775 PeekPlan::SlowPath(PeekDataflowPlan {
776 desc: dataflow,
777 id: index_id,
781 key: index_key,
782 permutation: index_permutation,
783 thinned_arity: index_thinned_arity,
784 }) => {
785 let output_ids = dataflow.export_ids().collect();
786
787 self.controller
789 .compute
790 .create_dataflow(compute_instance, dataflow, None)
791 .map_err(
792 AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
793 )?;
794 self.initialize_compute_read_policies(
795 output_ids,
796 compute_instance,
797 CompactionWindow::DisableCompaction,
799 )
800 .await;
801
802 let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
804 map_filter_project.permute_fn(
805 |c| index_permutation[c],
806 index_key.len() + index_thinned_arity,
807 );
808 let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
809
810 (
811 (None, timestamp, map_filter_project),
812 Some(index_id),
813 false,
814 PeekTarget::Index { id: index_id },
815 StatementExecutionStrategy::Standard,
816 )
817 }
818 PeekPlan::FastPath(_) => {
819 unreachable!()
820 }
821 };
822
823 let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
825
826 let mut uuid = Uuid::new_v4();
829 while self.pending_peeks.contains_key(&uuid) {
830 uuid = Uuid::new_v4();
831 }
832
833 let (literal_constraints, timestamp, map_filter_project) = peek_command;
834
835 let peek_result_column_names =
838 (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
839 let peek_result_desc =
840 RelationDesc::new(intermediate_result_type, peek_result_column_names);
841
842 self.controller
843 .compute
844 .peek(
845 compute_instance,
846 peek_target,
847 literal_constraints,
848 uuid,
849 timestamp,
850 peek_result_desc,
851 finishing.clone(),
852 map_filter_project,
853 target_replica,
854 rows_tx,
855 )
856 .map_err(AdapterError::concurrent_dependency_drop_from_peek_error)?;
857
858 self.pending_peeks.insert(
862 uuid,
863 PendingPeek {
864 conn_id: conn_id.clone(),
865 cluster_id: compute_instance,
866 depends_on: source_ids,
867 ctx_extra: std::mem::take(ctx_extra),
868 is_fast_path,
869 },
870 );
871 self.client_pending_peeks
872 .entry(conn_id)
873 .or_default()
874 .insert(uuid, compute_instance);
875
876 let duration_histogram = self.metrics.row_set_finishing_seconds();
877
878 if let Some(index_id) = drop_dataflow {
880 self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
881 self.drop_compute_collections(vec![(compute_instance, index_id)]);
882 }
883
884 let persist_client = self.persist_client.clone();
885 let peek_stash_read_batch_size_bytes =
886 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
887 .get(self.catalog().system_config().dyncfgs());
888 let peek_stash_read_memory_budget_bytes =
889 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
890 .get(self.catalog().system_config().dyncfgs());
891
892 let peek_response_stream = Self::create_peek_response_stream(
893 rows_rx,
894 finishing,
895 max_result_size,
896 max_returned_query_size,
897 duration_histogram,
898 persist_client,
899 peek_stash_read_batch_size_bytes,
900 peek_stash_read_memory_budget_bytes,
901 );
902
903 Ok(crate::ExecuteResponse::SendingRowsStreaming {
904 rows: Box::pin(peek_response_stream),
905 instance_id: compute_instance,
906 strategy,
907 })
908 }
909
910 #[mz_ore::instrument(level = "debug")]
914 pub(crate) fn create_peek_response_stream(
915 rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
916 finishing: RowSetFinishing,
917 max_result_size: u64,
918 max_returned_query_size: Option<u64>,
919 duration_histogram: prometheus::Histogram,
920 mut persist_client: mz_persist_client::PersistClient,
921 peek_stash_read_batch_size_bytes: usize,
922 peek_stash_read_memory_budget_bytes: usize,
923 ) -> impl futures::Stream<Item = PeekResponseUnary> {
924 async_stream::stream!({
925 let result = rows_rx.await;
926
927 let rows = match result {
928 Ok(rows) => rows,
929 Err(e) => {
930 yield PeekResponseUnary::Error(e.to_string());
931 return;
932 }
933 };
934
935 match rows {
936 PeekResponse::Rows(rows) => {
937 let rows = RowCollection::merge_sorted(&rows, &finishing.order_by);
938 match finishing.finish(
939 rows,
940 max_result_size,
941 max_returned_query_size,
942 &duration_histogram,
943 ) {
944 Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
945 Err(e) => yield PeekResponseUnary::Error(e),
946 }
947 }
948 PeekResponse::Stashed(response) => {
949 let response = *response;
950
951 let shard_id = response.shard_id;
952
953 let mut batches = Vec::new();
954 for proto_batch in response.batches.into_iter() {
955 let batch =
956 persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
957
958 batches.push(batch);
959 }
960 tracing::trace!(?batches, "stashed peek response");
961
962 let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
963 let read_schemas: Schemas<SourceData, ()> = Schemas {
964 id: None,
965 key: Arc::new(response.relation_desc.clone()),
966 val: Arc::new(UnitSchema),
967 };
968
969 let mut row_cursor = persist_client
970 .read_batches_consolidated::<_, _, _, i64>(
971 response.shard_id,
972 as_of,
973 read_schemas,
974 batches,
975 |_stats| true,
976 peek_stash_read_memory_budget_bytes,
977 )
978 .await
979 .expect("invalid usage");
980
981 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1000 mz_ore::task::spawn(|| "read_peek_batches", async move {
1001 for rows in response.inline_rows {
1011 let result = tx.send(rows).await;
1012 if result.is_err() {
1013 tracing::debug!("receiver went away");
1014 }
1015 }
1016
1017 let mut current_batch = Vec::new();
1018 let mut current_batch_size: usize = 0;
1019
1020 'outer: while let Some(rows) = row_cursor.next().await {
1021 for ((source_data, _val), _ts, diff) in rows {
1022 let row = source_data
1023 .0
1024 .expect("we are not sending errors on this code path");
1025
1026 let diff = usize::try_from(diff)
1027 .expect("peek responses cannot have negative diffs");
1028
1029 if diff > 0 {
1030 let diff =
1031 NonZeroUsize::new(diff).expect("checked to be non-zero");
1032 current_batch_size =
1033 current_batch_size.saturating_add(row.byte_len());
1034 current_batch.push((row, diff));
1035 }
1036
1037 if current_batch_size > peek_stash_read_batch_size_bytes {
1038 let result = tx
1044 .send(RowCollection::new(
1045 current_batch.drain(..).collect_vec(),
1046 &[],
1047 ))
1048 .await;
1049 if result.is_err() {
1050 tracing::debug!("receiver went away");
1051 break 'outer;
1054 }
1055
1056 current_batch_size = 0;
1057 }
1058 }
1059 }
1060
1061 if current_batch.len() > 0 {
1062 let result = tx.send(RowCollection::new(current_batch, &[])).await;
1063 if result.is_err() {
1064 tracing::debug!("receiver went away");
1065 }
1066 }
1067
1068 let batches = row_cursor.into_lease();
1069 tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1070 for batch in batches {
1071 batch.delete().await;
1072 }
1073 });
1074
1075 assert!(
1076 finishing.is_streamable(response.relation_desc.arity()),
1077 "can only get stashed responses when the finishing is streamable"
1078 );
1079
1080 tracing::trace!("query result is streamable!");
1081
1082 assert!(finishing.is_streamable(response.relation_desc.arity()));
1083 let mut incremental_finishing = RowSetFinishingIncremental::new(
1084 finishing.offset,
1085 finishing.limit,
1086 finishing.project,
1087 max_returned_query_size,
1088 );
1089
1090 let mut got_zero_rows = true;
1091 while let Some(rows) = rx.recv().await {
1092 got_zero_rows = false;
1093
1094 let result_rows = incremental_finishing.finish_incremental(
1095 rows,
1096 max_result_size,
1097 &duration_histogram,
1098 );
1099
1100 match result_rows {
1101 Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1102 Err(e) => yield PeekResponseUnary::Error(e),
1103 }
1104 }
1105
1106 if got_zero_rows {
1109 let row_iter = vec![].into_row_iter();
1110 yield PeekResponseUnary::Rows(Box::new(row_iter));
1111 }
1112 }
1113 PeekResponse::Canceled => {
1114 yield PeekResponseUnary::Canceled;
1115 }
1116 PeekResponse::Error(e) => {
1117 yield PeekResponseUnary::Error(e);
1118 }
1119 }
1120 })
1121 }
1122
1123 #[mz_ore::instrument(level = "debug")]
1125 pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1126 if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1127 self.metrics
1128 .canceled_peeks
1129 .inc_by(u64::cast_from(uuids.len()));
1130
1131 let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1132 for (uuid, compute_instance) in &uuids {
1133 inverse.entry(*compute_instance).or_default().insert(*uuid);
1134 }
1135 for (compute_instance, uuids) in inverse {
1136 for uuid in uuids {
1141 let _ = self.controller.compute.cancel_peek(
1142 compute_instance,
1143 uuid,
1144 PeekResponse::Canceled,
1145 );
1146 }
1147 }
1148
1149 let peeks = uuids
1150 .iter()
1151 .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1152 .collect::<Vec<_>>();
1153 for peek in peeks {
1154 self.retire_execution(
1155 StatementEndedExecutionReason::Canceled,
1156 peek.ctx_extra.defuse(),
1157 );
1158 }
1159 }
1160 }
1161
1162 pub(crate) fn handle_peek_notification(
1165 &mut self,
1166 uuid: Uuid,
1167 notification: PeekNotification,
1168 otel_ctx: OpenTelemetryContext,
1169 ) {
1170 if let Some(PendingPeek {
1173 conn_id: _,
1174 cluster_id: _,
1175 depends_on: _,
1176 ctx_extra,
1177 is_fast_path,
1178 }) = self.remove_pending_peek(&uuid)
1179 {
1180 let reason = match notification {
1181 PeekNotification::Success {
1182 rows: num_rows,
1183 result_size,
1184 } => {
1185 let strategy = if is_fast_path {
1186 StatementExecutionStrategy::FastPath
1187 } else {
1188 StatementExecutionStrategy::Standard
1189 };
1190 StatementEndedExecutionReason::Success {
1191 result_size: Some(result_size),
1192 rows_returned: Some(num_rows),
1193 execution_strategy: Some(strategy),
1194 }
1195 }
1196 PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1197 PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1198 };
1199 otel_ctx.attach_as_parent();
1200 self.retire_execution(reason, ctx_extra.defuse());
1201 }
1202 }
1205
1206 pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1208 let pending_peek = self.pending_peeks.remove(uuid);
1209 if let Some(pending_peek) = &pending_peek {
1210 let uuids = self
1211 .client_pending_peeks
1212 .get_mut(&pending_peek.conn_id)
1213 .expect("coord peek state is inconsistent");
1214 uuids.remove(uuid);
1215 if uuids.is_empty() {
1216 self.client_pending_peeks.remove(&pending_peek.conn_id);
1217 }
1218 }
1219 pending_peek
1220 }
1221
1222 pub(crate) async fn implement_slow_path_peek(
1228 &mut self,
1229 dataflow_plan: PeekDataflowPlan,
1230 determination: TimestampDetermination,
1231 finishing: RowSetFinishing,
1232 compute_instance: ComputeInstanceId,
1233 target_replica: Option<ReplicaId>,
1234 intermediate_result_type: SqlRelationType,
1235 source_ids: BTreeSet<GlobalId>,
1236 conn_id: ConnectionId,
1237 max_result_size: u64,
1238 max_query_result_size: Option<u64>,
1239 watch_set: Option<WatchSetCreation>,
1240 ) -> Result<ExecuteResponse, AdapterError> {
1241 let statement_logging_id = watch_set.as_ref().map(|ws| ws.logging_id);
1246 if let Some(ws) = watch_set {
1247 self.install_peek_watch_sets(conn_id.clone(), ws)
1248 .map_err(|e| {
1249 AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e)
1250 })?;
1251 }
1252
1253 let source_arity = intermediate_result_type.arity();
1254
1255 let planned_peek = PlannedPeek {
1256 plan: PeekPlan::SlowPath(dataflow_plan),
1257 determination,
1258 conn_id,
1259 intermediate_result_type,
1260 source_arity,
1261 source_ids,
1262 };
1263
1264 self.implement_peek_plan(
1269 &mut ExecuteContextGuard::new(statement_logging_id, self.internal_cmd_tx.clone()),
1270 planned_peek,
1271 finishing,
1272 compute_instance,
1273 target_replica,
1274 max_result_size,
1275 max_query_result_size,
1276 )
1277 .await
1278 }
1279
1280 pub(crate) async fn implement_copy_to(
1293 &mut self,
1294 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1295 compute_instance: ComputeInstanceId,
1296 target_replica: Option<ReplicaId>,
1297 source_ids: BTreeSet<GlobalId>,
1298 conn_id: ConnectionId,
1299 watch_set: Option<WatchSetCreation>,
1300 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1301 ) {
1302 let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1304 e: AdapterError| {
1305 let _ = tx.send(Err(e));
1306 };
1307
1308 if let Some(ws) = watch_set {
1312 if let Err(e) = self.install_peek_watch_sets(conn_id.clone(), ws) {
1313 let err = AdapterError::concurrent_dependency_drop_from_watch_set_install_error(e);
1314 send_err(tx, err);
1315 return;
1316 }
1317 }
1318
1319 let sink_id = df_desc.sink_id();
1323
1324 let (sink_tx, sink_rx) = oneshot::channel();
1328 let active_copy_to = ActiveCopyTo {
1329 conn_id: conn_id.clone(),
1330 tx: sink_tx,
1331 cluster_id: compute_instance,
1332 depends_on: source_ids,
1333 };
1334
1335 drop(
1337 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1338 .await,
1339 );
1340
1341 if let Err(e) = self
1344 .try_ship_dataflow(df_desc, compute_instance, target_replica)
1345 .await
1346 .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1347 {
1348 self.remove_active_compute_sink(sink_id).await;
1352 send_err(tx, e);
1353 return;
1354 }
1355
1356 let span = Span::current();
1361 task::spawn(
1362 || "copy to completion",
1363 async move {
1364 let res = sink_rx.await;
1365 let result = match res {
1366 Ok(res) => res,
1367 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1368 };
1369
1370 let _ = tx.send(result);
1371 }
1372 .instrument(span),
1373 );
1374 }
1375
1376 pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1380 where
1381 I: IntoRowIterator,
1382 I::Iter: Send + Sync + 'static,
1383 {
1384 let rows = Box::new(rows.into_row_iter());
1385 ExecuteResponse::SendingRowsImmediate { rows }
1386 }
1387}
1388
1389#[cfg(test)]
1390mod tests {
1391 use mz_expr::func::IsNull;
1392 use mz_expr::{MapFilterProject, UnaryFunc};
1393 use mz_ore::str::Indent;
1394 use mz_repr::explain::text::text_string_at;
1395 use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1396 use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1397
1398 use super::*;
1399
1400 #[mz_ore::test]
1401 #[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
1403 let typ = SqlRelationType::new(vec![SqlColumnType {
1404 scalar_type: SqlScalarType::String,
1405 nullable: false,
1406 }]);
1407 let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1408 let no_lookup = FastPathPlan::PeekExisting(
1409 GlobalId::User(8),
1410 GlobalId::User(10),
1411 None,
1412 MapFilterProject::new(4)
1413 .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1414 .project([1, 4])
1415 .into_plan()
1416 .expect("invalid plan")
1417 .into_nontemporal()
1418 .expect("invalid nontemporal"),
1419 );
1420 let lookup = FastPathPlan::PeekExisting(
1421 GlobalId::User(9),
1422 GlobalId::User(11),
1423 Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1424 MapFilterProject::new(3)
1425 .filter(Some(
1426 MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1427 ))
1428 .into_plan()
1429 .expect("invalid plan")
1430 .into_nontemporal()
1431 .expect("invalid nontemporal"),
1432 );
1433
1434 let humanizer = DummyHumanizer;
1435 let config = ExplainConfig {
1436 redacted: false,
1437 verbose_syntax: true,
1438 ..Default::default()
1439 };
1440 let ctx_gen = || {
1441 let indent = Indent::default();
1442 let annotations = BTreeMap::new();
1443 PlanRenderingContext::<FastPathPlan>::new(
1444 indent,
1445 &humanizer,
1446 annotations,
1447 &config,
1448 BTreeSet::default(),
1449 )
1450 };
1451
1452 let constant_err_exp = "Error \"division by zero\"\n";
1453 let no_lookup_exp = "Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1454 let lookup_exp =
1455 "Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1456
1457 assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1458 assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1459 assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1460
1461 let mut constant_rows = vec![
1462 (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1463 (Row::pack(Some(Datum::String("world"))), 2.into()),
1464 (Row::pack(Some(Datum::String("star"))), 500.into()),
1465 ];
1466 let constant_exp1 =
1467 "Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
1468 assert_eq!(
1469 text_string_at(
1470 &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1471 ctx_gen
1472 ),
1473 constant_exp1
1474 );
1475 constant_rows
1476 .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1477 let constant_exp2 = "Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
1478 \n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
1479 \n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
1480 \n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
1481 \n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
1482 assert_eq!(
1483 text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1484 constant_exp2
1485 );
1486 }
1487}