1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18use std::ops::Deref;
19use std::sync::Arc;
20
21use differential_dataflow::consolidation::consolidate;
22use itertools::Itertools;
23use mz_adapter_types::compaction::CompactionWindow;
24use mz_adapter_types::connection::ConnectionId;
25use mz_cluster_client::ReplicaId;
26use mz_compute_client::controller::PeekNotification;
27use mz_compute_client::protocol::command::PeekTarget;
28use mz_compute_client::protocol::response::PeekResponse;
29use mz_compute_types::ComputeInstanceId;
30use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
31use mz_compute_types::sinks::ComputeSinkConnection;
32use mz_controller_types::ClusterId;
33use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
34use mz_expr::row::RowCollection;
35use mz_expr::{
36 EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
37 RowSetFinishingIncremental, permutation_for_arrangement,
38};
39use mz_ore::cast::CastFrom;
40use mz_ore::str::{StrExt, separated};
41use mz_ore::task;
42use mz_ore::tracing::OpenTelemetryContext;
43use mz_persist_client::Schemas;
44use mz_persist_types::codec_impls::UnitSchema;
45use mz_repr::explain::text::DisplayText;
46use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
47use mz_repr::{
48 Diff, GlobalId, IntoRowIterator, RelationDesc, Row, RowIterator, SqlRelationType,
49 preserves_order,
50};
51use mz_storage_types::sources::SourceData;
52use serde::{Deserialize, Serialize};
53use timely::progress::{Antichain, Timestamp};
54use tokio::sync::oneshot;
55use tracing::{Instrument, Span};
56use uuid::Uuid;
57
58use crate::active_compute_sink::{ActiveComputeSink, ActiveCopyTo};
59use crate::coord::timestamp_selection::TimestampDetermination;
60use crate::optimize::OptimizerError;
61use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
62use crate::util::ResultExt;
63use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
64
65#[derive(Debug)]
67pub(crate) struct PendingPeek {
68 pub(crate) conn_id: ConnectionId,
70 pub(crate) cluster_id: ClusterId,
72 pub(crate) depends_on: BTreeSet<GlobalId>,
74 pub(crate) ctx_extra: ExecuteContextExtra,
77 pub(crate) is_fast_path: bool,
79}
80
81#[derive(Debug)]
86pub enum PeekResponseUnary {
87 Rows(Box<dyn RowIterator + Send + Sync>),
88 Error(String),
89 Canceled,
90}
91
92#[derive(Clone, Debug)]
93pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
94 pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
95 pub(crate) id: GlobalId,
96 key: Vec<MirScalarExpr>,
97 permutation: Vec<usize>,
98 thinned_arity: usize,
99}
100
101impl<T> PeekDataflowPlan<T> {
102 pub fn new(
103 desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
104 id: GlobalId,
105 typ: &SqlRelationType,
106 ) -> Self {
107 let arity = typ.arity();
108 let key = typ
109 .default_key()
110 .into_iter()
111 .map(MirScalarExpr::column)
112 .collect::<Vec<_>>();
113 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
114 Self {
115 desc,
116 id,
117 key,
118 permutation,
119 thinned_arity: thinning.len(),
120 }
121 }
122}
123
124#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
125pub enum FastPathPlan {
126 Constant(Result<Vec<(Row, Diff)>, EvalError>, SqlRelationType),
131 PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
134 PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
136}
137
138impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
139 fn fmt_text(
140 &self,
141 f: &mut fmt::Formatter<'_>,
142 ctx: &mut PlanRenderingContext<'a, T>,
143 ) -> fmt::Result {
144 if ctx.config.verbose_syntax {
145 self.fmt_verbose_text(f, ctx)
146 } else {
147 self.fmt_default_text(f, ctx)
148 }
149 }
150}
151
152impl FastPathPlan {
153 pub fn fmt_default_text<'a, T>(
154 &self,
155 f: &mut fmt::Formatter<'_>,
156 ctx: &mut PlanRenderingContext<'a, T>,
157 ) -> fmt::Result {
158 let mode = HumanizedExplain::new(ctx.config.redacted);
159
160 match self {
161 FastPathPlan::Constant(rows, _) => {
162 write!(f, "{}→Constant ", ctx.indent)?;
163
164 match rows {
165 Ok(rows) => writeln!(f, "({} rows)", rows.len())?,
166 Err(err) => {
167 if mode.redacted() {
168 writeln!(f, "(error: █)")?;
169 } else {
170 writeln!(f, "(error: {})", err.to_string().quoted(),)?;
171 }
172 }
173 }
174 }
175 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
176 let coll = ctx
177 .humanizer
178 .humanize_id(*coll_id)
179 .unwrap_or_else(|| coll_id.to_string());
180 let idx = ctx
181 .humanizer
182 .humanize_id(*idx_id)
183 .unwrap_or_else(|| idx_id.to_string());
184 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
185 ctx.indent.set();
186
187 ctx.indent += 1;
188
189 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
190 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
191
192 if printed {
193 ctx.indent += 1;
194 }
195 if let Some(literal_constraints) = literal_constraints {
196 writeln!(f, "{}→Index Lookup on {coll} (using {idx})", ctx.indent)?;
197 ctx.indent += 1;
198 let values = separated("; ", mode.seq(literal_constraints, None));
199 writeln!(f, "{}Lookup values: {values}", ctx.indent)?;
200 } else {
201 writeln!(f, "{}→Indexed {coll} (using {idx})", ctx.indent)?;
202 }
203
204 ctx.indent.reset();
205 }
206 FastPathPlan::PeekPersist(global_id, literal_constraint, mfp) => {
207 let coll = ctx
208 .humanizer
209 .humanize_id(*global_id)
210 .unwrap_or_else(|| global_id.to_string());
211 writeln!(f, "{}→Map/Filter/Project", ctx.indent)?;
212 ctx.indent.set();
213
214 ctx.indent += 1;
215
216 mode.expr(mfp.deref(), None).fmt_default_text(f, ctx)?;
217 let printed = !mfp.expressions.is_empty() || !mfp.predicates.is_empty();
218
219 if printed {
220 ctx.indent += 1;
221 }
222 if let Some(literal_constraint) = literal_constraint {
223 writeln!(f, "{}→Index Lookup on {coll} (from storage)", ctx.indent)?;
224 ctx.indent += 1;
225 let value = mode.expr(literal_constraint, None);
226 writeln!(f, "{}Lookup value: {value}", ctx.indent)?;
227 } else {
228 writeln!(f, "{}→Indexed {coll} (from storage)", ctx.indent)?;
229 }
230
231 ctx.indent.reset();
232 }
233 }
234
235 Ok(())
236 }
237
238 pub fn fmt_verbose_text<'a, T>(
239 &self,
240 f: &mut fmt::Formatter<'_>,
241 ctx: &mut PlanRenderingContext<'a, T>,
242 ) -> fmt::Result {
243 let redacted = ctx.config.redacted;
244 let mode = HumanizedExplain::new(redacted);
245
246 match self {
249 FastPathPlan::Constant(Ok(rows), _) => {
250 if !rows.is_empty() {
251 writeln!(f, "{}Constant", ctx.indent)?;
252 *ctx.as_mut() += 1;
253 fmt_text_constant_rows(
254 f,
255 rows.iter().map(|(row, diff)| (row, diff)),
256 ctx.as_mut(),
257 redacted,
258 )?;
259 *ctx.as_mut() -= 1;
260 } else {
261 writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
262 }
263 Ok(())
264 }
265 FastPathPlan::Constant(Err(err), _) => {
266 if redacted {
267 writeln!(f, "{}Error █", ctx.as_mut())
268 } else {
269 writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
270 }
271 }
272 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
273 ctx.as_mut().set();
274 let (map, filter, project) = mfp.as_map_filter_project();
275
276 let cols = if !ctx.config.humanized_exprs {
277 None
278 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
279 let cols = itertools::chain(
283 cols.iter().cloned(),
284 std::iter::repeat(String::new()).take(map.len()),
285 )
286 .collect();
287 Some(cols)
288 } else {
289 None
290 };
291
292 if project.len() != mfp.input_arity + map.len()
293 || !project.iter().enumerate().all(|(i, o)| i == *o)
294 {
295 let outputs = mode.seq(&project, cols.as_ref());
296 let outputs = CompactScalars(outputs);
297 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
298 *ctx.as_mut() += 1;
299 }
300 if !filter.is_empty() {
301 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
302 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
303 *ctx.as_mut() += 1;
304 }
305 if !map.is_empty() {
306 let scalars = mode.seq(&map, cols.as_ref());
307 let scalars = CompactScalars(scalars);
308 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
309 *ctx.as_mut() += 1;
310 }
311 MirRelationExpr::fmt_indexed_filter(
312 f,
313 ctx,
314 coll_id,
315 idx_id,
316 literal_constraints.clone(),
317 None,
318 )?;
319 writeln!(f)?;
320 ctx.as_mut().reset();
321 Ok(())
322 }
323 FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
324 ctx.as_mut().set();
325 let (map, filter, project) = mfp.as_map_filter_project();
326
327 let cols = if !ctx.config.humanized_exprs {
328 None
329 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
330 let cols = itertools::chain(
331 cols.iter().cloned(),
332 std::iter::repeat(String::new()).take(map.len()),
333 )
334 .collect::<Vec<_>>();
335 Some(cols)
336 } else {
337 None
338 };
339
340 if project.len() != mfp.input_arity + map.len()
341 || !project.iter().enumerate().all(|(i, o)| i == *o)
342 {
343 let outputs = mode.seq(&project, cols.as_ref());
344 let outputs = CompactScalars(outputs);
345 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
346 *ctx.as_mut() += 1;
347 }
348 if !filter.is_empty() {
349 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
350 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
351 *ctx.as_mut() += 1;
352 }
353 if !map.is_empty() {
354 let scalars = mode.seq(&map, cols.as_ref());
355 let scalars = CompactScalars(scalars);
356 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
357 *ctx.as_mut() += 1;
358 }
359 let human_id = ctx
360 .humanizer
361 .humanize_id(*gid)
362 .unwrap_or_else(|| gid.to_string());
363 write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
364 if let Some(literal) = literal_constraint {
365 let value = mode.expr(literal, None);
366 writeln!(f, " [value={}]", value)?;
367 } else {
368 writeln!(f, "")?;
369 }
370 ctx.as_mut().reset();
371 Ok(())
372 }
373 }?;
374 Ok(())
375 }
376}
377
378#[derive(Debug)]
379pub struct PlannedPeek {
380 pub plan: PeekPlan,
381 pub determination: TimestampDetermination<mz_repr::Timestamp>,
382 pub conn_id: ConnectionId,
383 pub intermediate_result_type: SqlRelationType,
390 pub source_arity: usize,
391 pub source_ids: BTreeSet<GlobalId>,
392}
393
394#[derive(Clone, Debug)]
396pub enum PeekPlan<T = mz_repr::Timestamp> {
397 FastPath(FastPathPlan),
398 SlowPath(PeekDataflowPlan<T>),
400}
401
402fn mfp_to_safe_plan(
407 mfp: mz_expr::MapFilterProject,
408) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
409 mfp.into_plan()
410 .map_err(OptimizerError::InternalUnsafeMfpPlan)?
411 .into_nontemporal()
412 .map_err(|e| OptimizerError::InternalUnsafeMfpPlan(format!("{:?}", e)))
413}
414
415fn permute_oneshot_mfp_around_index(
417 mfp: mz_expr::MapFilterProject,
418 key: &[MirScalarExpr],
419) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
420 let input_arity = mfp.input_arity;
421 let mut safe_mfp = mfp_to_safe_plan(mfp)?;
422 let (permute, thinning) = permutation_for_arrangement(key, input_arity);
423 safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
424 Ok(safe_mfp)
425}
426
427pub fn create_fast_path_plan<T: Timestamp>(
433 dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
434 view_id: GlobalId,
435 finishing: Option<&RowSetFinishing>,
436 persist_fast_path_limit: usize,
437 persist_fast_path_order: bool,
438) -> Result<Option<FastPathPlan>, OptimizerError> {
439 if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
445 {
446 let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
447 if let Some((rows, found_typ)) = mir.as_const() {
448 return Ok(Some(FastPathPlan::Constant(
450 rows.clone()
451 .map(|rows| rows.into_iter().map(|(row, diff)| (row, diff)).collect()),
452 found_typ.clone(),
453 )));
454 } else {
455 if let MirRelationExpr::TopK {
458 input,
459 group_key,
460 order_key,
461 limit,
462 offset,
463 monotonic: _,
464 expected_group_size: _,
465 } = mir
466 {
467 if let Some(finishing) = finishing {
468 if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
469 let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
471 (None, _) => true,
472 (Some(..), None) => false,
473 (Some(topk_limit), Some(finishing_limit)) => {
474 if let Some(l) = topk_limit.as_literal_int64() {
475 l >= *finishing_limit
476 } else {
477 false
478 }
479 }
480 };
481 if finishing_limits_at_least_as_topk {
482 mir = input;
483 }
484 }
485 }
486 }
487 let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
491 match mir {
492 MirRelationExpr::Get {
493 id: Id::Global(get_id),
494 typ: relation_typ,
495 ..
496 } => {
497 for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
499 if desc.on_id == *get_id {
500 return Ok(Some(FastPathPlan::PeekExisting(
501 *get_id,
502 *index_id,
503 None,
504 permute_oneshot_mfp_around_index(mfp, &desc.key)?,
505 )));
506 }
507 }
508
509 let safe_mfp = mfp_to_safe_plan(mfp)?;
513 let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
514
515 let literal_constraint = if persist_fast_path_order {
516 let mut row = Row::default();
517 let mut packer = row.packer();
518 for (idx, col) in relation_typ.column_types.iter().enumerate() {
519 if !preserves_order(&col.scalar_type) {
520 break;
521 }
522 let col_expr = MirScalarExpr::column(idx);
523
524 let Some((literal, _)) = filters
525 .iter()
526 .filter_map(|f| f.expr_eq_literal(&col_expr))
527 .next()
528 else {
529 break;
530 };
531 packer.extend_by_row(&literal);
532 }
533 if row.is_empty() { None } else { Some(row) }
534 } else {
535 None
536 };
537
538 let finish_ok = match &finishing {
539 None => false,
540 Some(RowSetFinishing {
541 order_by,
542 limit,
543 offset,
544 ..
545 }) => {
546 let order_ok = if persist_fast_path_order {
547 order_by.iter().enumerate().all(|(idx, order)| {
548 let column_idx = projection[order.column];
551 if column_idx >= safe_mfp.input_arity {
552 return false;
553 }
554 let column_type = &relation_typ.column_types[column_idx];
555 let index_ok = idx == column_idx;
556 let nulls_ok = !column_type.nullable || order.nulls_last;
557 let asc_ok = !order.desc;
558 let type_ok = preserves_order(&column_type.scalar_type);
559 index_ok && nulls_ok && asc_ok && type_ok
560 })
561 } else {
562 order_by.is_empty()
563 };
564 let limit_ok = limit.map_or(false, |l| {
565 usize::cast_from(l) + *offset < persist_fast_path_limit
566 });
567 order_ok && limit_ok
568 }
569 };
570
571 let key_constraint = if let Some(literal) = &literal_constraint {
572 let prefix_len = literal.iter().count();
573 relation_typ
574 .keys
575 .iter()
576 .any(|k| k.iter().all(|idx| *idx < prefix_len))
577 } else {
578 false
579 };
580
581 if key_constraint || (filters.is_empty() && finish_ok) {
585 return Ok(Some(FastPathPlan::PeekPersist(
586 *get_id,
587 literal_constraint,
588 safe_mfp,
589 )));
590 }
591 }
592 MirRelationExpr::Join { implementation, .. } => {
593 if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
594 implementation
595 {
596 return Ok(Some(FastPathPlan::PeekExisting(
597 *coll_id,
598 *idx_id,
599 Some(vals.clone()),
600 permute_oneshot_mfp_around_index(mfp, key)?,
601 )));
602 }
603 }
604 _ => {}
606 }
607 }
608 }
609 Ok(None)
610}
611
612impl FastPathPlan {
613 pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
614 match self {
615 FastPathPlan::Constant(..) => UsedIndexes::default(),
616 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
617 if literal_constraints.is_some() {
618 UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
619 } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
620 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
621 } else {
622 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
623 }
624 }
625 FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
626 }
627 }
628}
629
630impl crate::coord::Coordinator {
631 #[mz_ore::instrument(level = "debug")]
633 pub async fn implement_peek_plan(
634 &mut self,
635 ctx_extra: &mut ExecuteContextExtra,
636 plan: PlannedPeek,
637 finishing: RowSetFinishing,
638 compute_instance: ComputeInstanceId,
639 target_replica: Option<ReplicaId>,
640 max_result_size: u64,
641 max_returned_query_size: Option<u64>,
642 ) -> Result<ExecuteResponse, AdapterError> {
643 let PlannedPeek {
644 plan: fast_path,
645 determination,
646 conn_id,
647 intermediate_result_type,
648 source_arity,
649 source_ids,
650 } = plan;
651
652 if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
654 let mut rows = match rows {
655 Ok(rows) => rows,
656 Err(e) => return Err(e.into()),
657 };
658 consolidate(&mut rows);
660
661 let mut results = Vec::new();
662 for (row, count) in rows {
663 if count.is_negative() {
664 Err(EvalError::InvalidParameterValue(
665 format!("Negative multiplicity in constant result: {}", count).into(),
666 ))?
667 };
668 if count.is_positive() {
669 let count = usize::cast_from(
670 u64::try_from(count.into_inner())
671 .expect("known to be positive from check above"),
672 );
673 results.push((
674 row,
675 NonZeroUsize::new(count).expect("known to be non-zero from check above"),
676 ));
677 }
678 }
679 let row_collection = RowCollection::new(results, &finishing.order_by);
680 let duration_histogram = self.metrics.row_set_finishing_seconds();
681
682 let (ret, reason) = match finishing.finish(
683 row_collection,
684 max_result_size,
685 max_returned_query_size,
686 &duration_histogram,
687 ) {
688 Ok((rows, row_size_bytes)) => {
689 let result_size = u64::cast_from(row_size_bytes);
690 let rows_returned = u64::cast_from(rows.count());
691 (
692 Ok(Self::send_immediate_rows(rows)),
693 StatementEndedExecutionReason::Success {
694 result_size: Some(result_size),
695 rows_returned: Some(rows_returned),
696 execution_strategy: Some(StatementExecutionStrategy::Constant),
697 },
698 )
699 }
700 Err(error) => (
701 Err(AdapterError::ResultSize(error.clone())),
702 StatementEndedExecutionReason::Errored { error },
703 ),
704 };
705 self.retire_execution(reason, std::mem::take(ctx_extra));
706 return ret;
707 }
708
709 let timestamp = determination.timestamp_context.timestamp_or_default();
710 if let Some(id) = ctx_extra.contents() {
711 self.set_statement_execution_timestamp(id, timestamp)
712 }
713
714 let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
721 PeekPlan::FastPath(FastPathPlan::PeekExisting(
722 _coll_id,
723 idx_id,
724 literal_constraints,
725 map_filter_project,
726 )) => (
727 (literal_constraints, timestamp, map_filter_project),
728 None,
729 true,
730 PeekTarget::Index { id: idx_id },
731 StatementExecutionStrategy::FastPath,
732 ),
733 PeekPlan::FastPath(FastPathPlan::PeekPersist(
734 coll_id,
735 literal_constraint,
736 map_filter_project,
737 )) => {
738 let peek_command = (
739 literal_constraint.map(|r| vec![r]),
740 timestamp,
741 map_filter_project,
742 );
743 let metadata = self
744 .controller
745 .storage
746 .collection_metadata(coll_id)
747 .expect("storage collection for fast-path peek")
748 .clone();
749 (
750 peek_command,
751 None,
752 true,
753 PeekTarget::Persist {
754 id: coll_id,
755 metadata,
756 },
757 StatementExecutionStrategy::PersistFastPath,
758 )
759 }
760 PeekPlan::SlowPath(PeekDataflowPlan {
761 desc: dataflow,
762 id: index_id,
766 key: index_key,
767 permutation: index_permutation,
768 thinned_arity: index_thinned_arity,
769 }) => {
770 let output_ids = dataflow.export_ids().collect();
771
772 self.controller
774 .compute
775 .create_dataflow(compute_instance, dataflow, None)
776 .map_err(
777 AdapterError::concurrent_dependency_drop_from_dataflow_creation_error,
778 )?;
779 self.initialize_compute_read_policies(
780 output_ids,
781 compute_instance,
782 CompactionWindow::DisableCompaction,
784 )
785 .await;
786
787 let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
789 map_filter_project.permute_fn(
790 |c| index_permutation[c],
791 index_key.len() + index_thinned_arity,
792 );
793 let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
794
795 (
796 (None, timestamp, map_filter_project),
797 Some(index_id),
798 false,
799 PeekTarget::Index { id: index_id },
800 StatementExecutionStrategy::Standard,
801 )
802 }
803 _ => {
804 unreachable!()
805 }
806 };
807
808 let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
810
811 let mut uuid = Uuid::new_v4();
814 while self.pending_peeks.contains_key(&uuid) {
815 uuid = Uuid::new_v4();
816 }
817
818 self.pending_peeks.insert(
821 uuid,
822 PendingPeek {
823 conn_id: conn_id.clone(),
824 cluster_id: compute_instance,
825 depends_on: source_ids,
826 ctx_extra: std::mem::take(ctx_extra),
827 is_fast_path,
828 },
829 );
830 self.client_pending_peeks
831 .entry(conn_id)
832 .or_default()
833 .insert(uuid, compute_instance);
834 let (literal_constraints, timestamp, map_filter_project) = peek_command;
835
836 let peek_result_column_names =
839 (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
840 let peek_result_desc =
841 RelationDesc::new(intermediate_result_type, peek_result_column_names);
842
843 self.controller
844 .compute
845 .peek(
846 compute_instance,
847 peek_target,
848 literal_constraints,
849 uuid,
850 timestamp,
851 peek_result_desc,
852 finishing.clone(),
853 map_filter_project,
854 target_replica,
855 rows_tx,
856 )
857 .unwrap_or_terminate("cannot fail to peek");
858
859 let duration_histogram = self.metrics.row_set_finishing_seconds();
860
861 if let Some(index_id) = drop_dataflow {
863 self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
864 self.drop_compute_collections(vec![(compute_instance, index_id)]);
865 }
866
867 let persist_client = self.persist_client.clone();
868 let peek_stash_read_batch_size_bytes =
869 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_BATCH_SIZE_BYTES
870 .get(self.catalog().system_config().dyncfgs());
871 let peek_stash_read_memory_budget_bytes =
872 mz_compute_types::dyncfgs::PEEK_RESPONSE_STASH_READ_MEMORY_BUDGET_BYTES
873 .get(self.catalog().system_config().dyncfgs());
874
875 let peek_response_stream = Self::create_peek_response_stream(
876 rows_rx,
877 finishing,
878 max_result_size,
879 max_returned_query_size,
880 duration_histogram,
881 persist_client,
882 peek_stash_read_batch_size_bytes,
883 peek_stash_read_memory_budget_bytes,
884 );
885
886 Ok(crate::ExecuteResponse::SendingRowsStreaming {
887 rows: Box::pin(peek_response_stream),
888 instance_id: compute_instance,
889 strategy,
890 })
891 }
892
893 #[mz_ore::instrument(level = "debug")]
895 pub(crate) fn create_peek_response_stream(
896 rows_rx: tokio::sync::oneshot::Receiver<PeekResponse>,
897 finishing: RowSetFinishing,
898 max_result_size: u64,
899 max_returned_query_size: Option<u64>,
900 duration_histogram: prometheus::Histogram,
901 mut persist_client: mz_persist_client::PersistClient,
902 peek_stash_read_batch_size_bytes: usize,
903 peek_stash_read_memory_budget_bytes: usize,
904 ) -> impl futures::Stream<Item = PeekResponseUnary> {
905 async_stream::stream!({
906 let result = rows_rx.await;
907
908 let rows = match result {
909 Ok(rows) => rows,
910 Err(e) => {
911 yield PeekResponseUnary::Error(e.to_string());
912 return;
913 }
914 };
915
916 match rows {
917 PeekResponse::Rows(rows) => {
918 match finishing.finish(
919 rows,
920 max_result_size,
921 max_returned_query_size,
922 &duration_histogram,
923 ) {
924 Ok((rows, _size_bytes)) => yield PeekResponseUnary::Rows(Box::new(rows)),
925 Err(e) => yield PeekResponseUnary::Error(e),
926 }
927 }
928 PeekResponse::Stashed(response) => {
929 let response = *response;
930
931 let shard_id = response.shard_id;
932
933 let mut batches = Vec::new();
934 for proto_batch in response.batches.into_iter() {
935 let batch =
936 persist_client.batch_from_transmittable_batch(&shard_id, proto_batch);
937
938 batches.push(batch);
939 }
940 tracing::trace!(?batches, "stashed peek response");
941
942 let as_of = Antichain::from_elem(mz_repr::Timestamp::default());
943 let read_schemas: Schemas<SourceData, ()> = Schemas {
944 id: None,
945 key: Arc::new(response.relation_desc.clone()),
946 val: Arc::new(UnitSchema),
947 };
948
949 let mut row_cursor = persist_client
950 .read_batches_consolidated::<_, _, _, i64>(
951 response.shard_id,
952 as_of,
953 read_schemas,
954 batches,
955 |_stats| true,
956 peek_stash_read_memory_budget_bytes,
957 )
958 .await
959 .expect("invalid usage");
960
961 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
980 mz_ore::task::spawn(|| "read_peek_batches", async move {
981 let result = tx.send(response.inline_rows).await;
991 if result.is_err() {
992 tracing::error!("receiver went away");
993 }
994
995 let mut current_batch = Vec::new();
996 let mut current_batch_size: usize = 0;
997
998 'outer: while let Some(rows) = row_cursor.next().await {
999 for ((key, _val), _ts, diff) in rows {
1000 let source_data = key.expect("decoding error");
1001
1002 let row = source_data
1003 .0
1004 .expect("we are not sending errors on this code path");
1005
1006 let diff = usize::try_from(diff)
1007 .expect("peek responses cannot have negative diffs");
1008
1009 if diff > 0 {
1010 let diff =
1011 NonZeroUsize::new(diff).expect("checked to be non-zero");
1012 current_batch_size =
1013 current_batch_size.saturating_add(row.byte_len());
1014 current_batch.push((row, diff));
1015 }
1016
1017 if current_batch_size > peek_stash_read_batch_size_bytes {
1018 let result = tx
1024 .send(RowCollection::new(
1025 current_batch.drain(..).collect_vec(),
1026 &[],
1027 ))
1028 .await;
1029 if result.is_err() {
1030 tracing::error!("receiver went away");
1031 break 'outer;
1034 }
1035
1036 current_batch_size = 0;
1037 }
1038 }
1039 }
1040
1041 if current_batch.len() > 0 {
1042 let result = tx.send(RowCollection::new(current_batch, &[])).await;
1043 if result.is_err() {
1044 tracing::error!("receiver went away");
1045 }
1046 }
1047
1048 let batches = row_cursor.into_lease();
1049 tracing::trace!(?response.shard_id, "cleaning up batches of peek result");
1050 for batch in batches {
1051 batch.delete().await;
1052 }
1053 });
1054
1055 assert!(
1056 finishing.is_streamable(response.relation_desc.arity()),
1057 "can only get stashed responses when the finishing is streamable"
1058 );
1059
1060 tracing::trace!("query result is streamable!");
1061
1062 assert!(finishing.is_streamable(response.relation_desc.arity()));
1063 let mut incremental_finishing = RowSetFinishingIncremental::new(
1064 finishing.offset,
1065 finishing.limit,
1066 finishing.project,
1067 max_returned_query_size,
1068 );
1069
1070 let mut got_zero_rows = true;
1071 while let Some(rows) = rx.recv().await {
1072 got_zero_rows = false;
1073
1074 let result_rows = incremental_finishing.finish_incremental(
1075 rows,
1076 max_result_size,
1077 &duration_histogram,
1078 );
1079
1080 match result_rows {
1081 Ok(result_rows) => yield PeekResponseUnary::Rows(Box::new(result_rows)),
1082 Err(e) => yield PeekResponseUnary::Error(e),
1083 }
1084 }
1085
1086 if got_zero_rows {
1089 let row_iter = vec![].into_row_iter();
1090 yield PeekResponseUnary::Rows(Box::new(row_iter));
1091 }
1092 }
1093 PeekResponse::Canceled => {
1094 yield PeekResponseUnary::Canceled;
1095 }
1096 PeekResponse::Error(e) => {
1097 yield PeekResponseUnary::Error(e);
1098 }
1099 }
1100 })
1101 }
1102
1103 #[mz_ore::instrument(level = "debug")]
1105 pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
1106 if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
1107 self.metrics
1108 .canceled_peeks
1109 .inc_by(u64::cast_from(uuids.len()));
1110
1111 let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
1112 for (uuid, compute_instance) in &uuids {
1113 inverse.entry(*compute_instance).or_default().insert(*uuid);
1114 }
1115 for (compute_instance, uuids) in inverse {
1116 for uuid in uuids {
1121 let _ = self.controller.compute.cancel_peek(
1122 compute_instance,
1123 uuid,
1124 PeekResponse::Canceled,
1125 );
1126 }
1127 }
1128
1129 let peeks = uuids
1130 .iter()
1131 .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
1132 .collect::<Vec<_>>();
1133 for peek in peeks {
1134 self.retire_execution(StatementEndedExecutionReason::Canceled, peek.ctx_extra);
1135 }
1136 }
1137 }
1138
1139 pub(crate) fn handle_peek_notification(
1142 &mut self,
1143 uuid: Uuid,
1144 notification: PeekNotification,
1145 otel_ctx: OpenTelemetryContext,
1146 ) {
1147 if let Some(PendingPeek {
1150 conn_id: _,
1151 cluster_id: _,
1152 depends_on: _,
1153 ctx_extra,
1154 is_fast_path,
1155 }) = self.remove_pending_peek(&uuid)
1156 {
1157 let reason = match notification {
1158 PeekNotification::Success {
1159 rows: num_rows,
1160 result_size,
1161 } => {
1162 let strategy = if is_fast_path {
1163 StatementExecutionStrategy::FastPath
1164 } else {
1165 StatementExecutionStrategy::Standard
1166 };
1167 StatementEndedExecutionReason::Success {
1168 result_size: Some(result_size),
1169 rows_returned: Some(num_rows),
1170 execution_strategy: Some(strategy),
1171 }
1172 }
1173 PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
1174 PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
1175 };
1176 otel_ctx.attach_as_parent();
1177 self.retire_execution(reason, ctx_extra);
1178 }
1179 }
1182
1183 pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
1185 let pending_peek = self.pending_peeks.remove(uuid);
1186 if let Some(pending_peek) = &pending_peek {
1187 let uuids = self
1188 .client_pending_peeks
1189 .get_mut(&pending_peek.conn_id)
1190 .expect("coord peek state is inconsistent");
1191 uuids.remove(uuid);
1192 if uuids.is_empty() {
1193 self.client_pending_peeks.remove(&pending_peek.conn_id);
1194 }
1195 }
1196 pending_peek
1197 }
1198
1199 pub(crate) async fn implement_slow_path_peek(
1205 &mut self,
1206 dataflow_plan: PeekDataflowPlan<mz_repr::Timestamp>,
1207 determination: TimestampDetermination<mz_repr::Timestamp>,
1208 finishing: RowSetFinishing,
1209 compute_instance: ComputeInstanceId,
1210 target_replica: Option<ReplicaId>,
1211 intermediate_result_type: SqlRelationType,
1212 source_ids: BTreeSet<GlobalId>,
1213 conn_id: ConnectionId,
1214 max_result_size: u64,
1215 max_query_result_size: Option<u64>,
1216 ) -> Result<ExecuteResponse, AdapterError> {
1217 let source_arity = intermediate_result_type.arity();
1218
1219 let planned_peek = PlannedPeek {
1220 plan: PeekPlan::SlowPath(dataflow_plan),
1221 determination,
1222 conn_id,
1223 intermediate_result_type,
1224 source_arity,
1225 source_ids,
1226 };
1227
1228 let mut ctx_extra = ExecuteContextExtra::default();
1231
1232 self.implement_peek_plan(
1237 &mut ctx_extra,
1238 planned_peek,
1239 finishing,
1240 compute_instance,
1241 target_replica,
1242 max_result_size,
1243 max_query_result_size,
1244 )
1245 .await
1246 }
1247
1248 pub(crate) async fn implement_copy_to(
1259 &mut self,
1260 df_desc: DataflowDescription<mz_compute_types::plan::Plan>,
1261 compute_instance: ComputeInstanceId,
1262 target_replica: Option<ReplicaId>,
1263 source_ids: BTreeSet<GlobalId>,
1264 conn_id: ConnectionId,
1265 tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1266 ) {
1267 let send_err = |tx: oneshot::Sender<Result<ExecuteResponse, AdapterError>>,
1269 e: AdapterError| {
1270 let _ = tx.send(Err(e));
1271 };
1272
1273 let sink_id = df_desc.sink_id();
1274
1275 let connection_context = self.connection_context().clone();
1278 let sinks = &df_desc.sink_exports;
1279
1280 if sinks.len() != 1 {
1281 send_err(
1282 tx,
1283 AdapterError::Internal("expected exactly one copy to s3 sink".into()),
1284 );
1285 return;
1286 }
1287 let (sink_id_check, sink_desc) = sinks
1288 .first_key_value()
1289 .expect("known to be exactly one copy to s3 sink");
1290 assert_eq!(sink_id, *sink_id_check);
1291
1292 match &sink_desc.connection {
1293 ComputeSinkConnection::CopyToS3Oneshot(conn) => {
1294 if let Err(e) = mz_storage_types::sinks::s3_oneshot_sink::preflight(
1295 connection_context,
1296 &conn.aws_connection,
1297 &conn.upload_info,
1298 conn.connection_id,
1299 sink_id,
1300 )
1301 .await
1302 {
1303 send_err(tx, e.into());
1304 return;
1305 }
1306 }
1307 _ => {
1308 send_err(
1309 tx,
1310 AdapterError::Internal("expected copy to s3 oneshot sink".into()),
1311 );
1312 return;
1313 }
1314 }
1315
1316 let (sink_tx, sink_rx) = oneshot::channel();
1322 let active_copy_to = ActiveCopyTo {
1323 conn_id: conn_id.clone(),
1324 tx: sink_tx,
1325 cluster_id: compute_instance,
1326 depends_on: source_ids,
1327 };
1328
1329 drop(
1331 self.add_active_compute_sink(sink_id, ActiveComputeSink::CopyTo(active_copy_to))
1332 .await,
1333 );
1334
1335 if let Err(e) = self
1338 .try_ship_dataflow(df_desc, compute_instance, target_replica)
1339 .await
1340 .map_err(AdapterError::concurrent_dependency_drop_from_dataflow_creation_error)
1341 {
1342 self.remove_active_compute_sink(sink_id).await;
1346 let _ = tx.send(Err(e));
1347 return;
1348 }
1349
1350 let span = Span::current();
1355 task::spawn(
1356 || "copy to completion",
1357 async move {
1358 let res = sink_rx.await;
1359 let result = match res {
1360 Ok(res) => res,
1361 Err(_) => Err(AdapterError::Internal("copy to sender dropped".into())),
1362 };
1363 let _ = tx.send(result);
1364 }
1365 .instrument(span),
1366 );
1367 }
1368
1369 pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
1373 where
1374 I: IntoRowIterator,
1375 I::Iter: Send + Sync + 'static,
1376 {
1377 let rows = Box::new(rows.into_row_iter());
1378 ExecuteResponse::SendingRowsImmediate { rows }
1379 }
1380}
1381
1382#[cfg(test)]
1383mod tests {
1384 use mz_expr::func::IsNull;
1385 use mz_expr::{MapFilterProject, UnaryFunc};
1386 use mz_ore::str::Indent;
1387 use mz_repr::explain::text::text_string_at;
1388 use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
1389 use mz_repr::{Datum, SqlColumnType, SqlScalarType};
1390
1391 use super::*;
1392
1393 #[mz_ore::test]
1394 #[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
1396 let typ = SqlRelationType::new(vec![SqlColumnType {
1397 scalar_type: SqlScalarType::String,
1398 nullable: false,
1399 }]);
1400 let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
1401 let no_lookup = FastPathPlan::PeekExisting(
1402 GlobalId::User(8),
1403 GlobalId::User(10),
1404 None,
1405 MapFilterProject::new(4)
1406 .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
1407 .project([1, 4])
1408 .into_plan()
1409 .expect("invalid plan")
1410 .into_nontemporal()
1411 .expect("invalid nontemporal"),
1412 );
1413 let lookup = FastPathPlan::PeekExisting(
1414 GlobalId::User(9),
1415 GlobalId::User(11),
1416 Some(vec![Row::pack(Some(Datum::Int32(5)))]),
1417 MapFilterProject::new(3)
1418 .filter(Some(
1419 MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
1420 ))
1421 .into_plan()
1422 .expect("invalid plan")
1423 .into_nontemporal()
1424 .expect("invalid nontemporal"),
1425 );
1426
1427 let humanizer = DummyHumanizer;
1428 let config = ExplainConfig {
1429 redacted: false,
1430 verbose_syntax: true,
1431 ..Default::default()
1432 };
1433 let ctx_gen = || {
1434 let indent = Indent::default();
1435 let annotations = BTreeMap::new();
1436 PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
1437 };
1438
1439 let constant_err_exp = "Error \"division by zero\"\n";
1440 let no_lookup_exp = "Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
1441 let lookup_exp =
1442 "Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
1443
1444 assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
1445 assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
1446 assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
1447
1448 let mut constant_rows = vec![
1449 (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
1450 (Row::pack(Some(Datum::String("world"))), 2.into()),
1451 (Row::pack(Some(Datum::String("star"))), 500.into()),
1452 ];
1453 let constant_exp1 =
1454 "Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
1455 assert_eq!(
1456 text_string_at(
1457 &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
1458 ctx_gen
1459 ),
1460 constant_exp1
1461 );
1462 constant_rows
1463 .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
1464 let constant_exp2 = "Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
1465 \n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
1466 \n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
1467 \n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
1468 \n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
1469 assert_eq!(
1470 text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
1471 constant_exp2
1472 );
1473 }
1474}