1use std::collections::{BTreeMap, BTreeSet};
16use std::fmt;
17use std::num::NonZeroUsize;
18
19use differential_dataflow::consolidation::consolidate;
20use futures::TryFutureExt;
21use mz_adapter_types::compaction::CompactionWindow;
22use mz_adapter_types::connection::ConnectionId;
23use mz_cluster_client::ReplicaId;
24use mz_compute_client::controller::PeekNotification;
25use mz_compute_client::protocol::command::PeekTarget;
26use mz_compute_client::protocol::response::PeekResponse;
27use mz_compute_types::ComputeInstanceId;
28use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
29use mz_controller_types::ClusterId;
30use mz_expr::explain::{HumanizedExplain, HumanizerMode, fmt_text_constant_rows};
31use mz_expr::row::RowCollection;
32use mz_expr::{
33 EvalError, Id, MirRelationExpr, MirScalarExpr, OptimizedMirRelationExpr, RowSetFinishing,
34 permutation_for_arrangement,
35};
36use mz_ore::cast::CastFrom;
37use mz_ore::str::{StrExt, separated};
38use mz_ore::tracing::OpenTelemetryContext;
39use mz_repr::explain::text::DisplayText;
40use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
41use mz_repr::{Diff, GlobalId, IntoRowIterator, RelationType, Row, RowIterator, preserves_order};
42use serde::{Deserialize, Serialize};
43use timely::progress::Timestamp;
44use uuid::Uuid;
45
46use crate::coord::timestamp_selection::TimestampDetermination;
47use crate::optimize::OptimizerError;
48use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
49use crate::util::ResultExt;
50use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
51
52#[derive(Debug)]
54pub(crate) struct PendingPeek {
55 pub(crate) conn_id: ConnectionId,
57 pub(crate) cluster_id: ClusterId,
59 pub(crate) depends_on: BTreeSet<GlobalId>,
61 pub(crate) ctx_extra: ExecuteContextExtra,
64 pub(crate) is_fast_path: bool,
66}
67
68#[derive(Debug)]
73pub enum PeekResponseUnary {
74 Rows(Box<dyn RowIterator + Send + Sync>),
75 Error(String),
76 Canceled,
77}
78
79#[derive(Clone, Debug)]
80pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
81 pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
82 pub(crate) id: GlobalId,
83 key: Vec<MirScalarExpr>,
84 permutation: Vec<usize>,
85 thinned_arity: usize,
86}
87
88impl<T> PeekDataflowPlan<T> {
89 pub fn new(
90 desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
91 id: GlobalId,
92 typ: &RelationType,
93 ) -> Self {
94 let arity = typ.arity();
95 let key = typ
96 .default_key()
97 .into_iter()
98 .map(MirScalarExpr::Column)
99 .collect::<Vec<_>>();
100 let (permutation, thinning) = permutation_for_arrangement(&key, arity);
101 Self {
102 desc,
103 id,
104 key,
105 permutation,
106 thinned_arity: thinning.len(),
107 }
108 }
109}
110
111#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
112pub enum FastPathPlan {
113 Constant(Result<Vec<(Row, Diff)>, EvalError>, RelationType),
118 PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
121 PeekPersist(GlobalId, Option<Row>, mz_expr::SafeMfpPlan),
123}
124
125impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
126 fn fmt_text(
127 &self,
128 f: &mut fmt::Formatter<'_>,
129 ctx: &mut PlanRenderingContext<'a, T>,
130 ) -> fmt::Result {
131 let redacted = ctx.config.redacted;
132 let mode = HumanizedExplain::new(ctx.config.redacted);
133
134 match self {
137 FastPathPlan::Constant(Ok(rows), _) => {
138 if !rows.is_empty() {
139 writeln!(f, "{}Constant", ctx.indent)?;
140 *ctx.as_mut() += 1;
141 fmt_text_constant_rows(
142 f,
143 rows.iter().map(|(row, diff)| (row, diff)),
144 ctx.as_mut(),
145 redacted,
146 )?;
147 *ctx.as_mut() -= 1;
148 } else {
149 writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
150 }
151 Ok(())
152 }
153 FastPathPlan::Constant(Err(err), _) => {
154 if redacted {
155 writeln!(f, "{}Error â–ˆ", ctx.as_mut())
156 } else {
157 writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
158 }
159 }
160 FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
161 ctx.as_mut().set();
162 let (map, filter, project) = mfp.as_map_filter_project();
163
164 let cols = if !ctx.config.humanized_exprs {
165 None
166 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
167 let cols = itertools::chain(
171 cols.iter().cloned(),
172 std::iter::repeat(String::new()).take(map.len()),
173 )
174 .collect();
175 Some(cols)
176 } else {
177 None
178 };
179
180 if project.len() != mfp.input_arity + map.len()
181 || !project.iter().enumerate().all(|(i, o)| i == *o)
182 {
183 let outputs = mode.seq(&project, cols.as_ref());
184 let outputs = CompactScalars(outputs);
185 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
186 *ctx.as_mut() += 1;
187 }
188 if !filter.is_empty() {
189 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
190 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
191 *ctx.as_mut() += 1;
192 }
193 if !map.is_empty() {
194 let scalars = mode.seq(&map, cols.as_ref());
195 let scalars = CompactScalars(scalars);
196 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
197 *ctx.as_mut() += 1;
198 }
199 MirRelationExpr::fmt_indexed_filter(
200 f,
201 ctx,
202 coll_id,
203 idx_id,
204 literal_constraints.clone(),
205 None,
206 )?;
207 writeln!(f)?;
208 ctx.as_mut().reset();
209 Ok(())
210 }
211 FastPathPlan::PeekPersist(gid, literal_constraint, mfp) => {
212 ctx.as_mut().set();
213 let (map, filter, project) = mfp.as_map_filter_project();
214
215 let cols = if !ctx.config.humanized_exprs {
216 None
217 } else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
218 let cols = itertools::chain(
219 cols.iter().cloned(),
220 std::iter::repeat(String::new()).take(map.len()),
221 )
222 .collect::<Vec<_>>();
223 Some(cols)
224 } else {
225 None
226 };
227
228 if project.len() != mfp.input_arity + map.len()
229 || !project.iter().enumerate().all(|(i, o)| i == *o)
230 {
231 let outputs = mode.seq(&project, cols.as_ref());
232 let outputs = CompactScalars(outputs);
233 writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
234 *ctx.as_mut() += 1;
235 }
236 if !filter.is_empty() {
237 let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
238 writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
239 *ctx.as_mut() += 1;
240 }
241 if !map.is_empty() {
242 let scalars = mode.seq(&map, cols.as_ref());
243 let scalars = CompactScalars(scalars);
244 writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
245 *ctx.as_mut() += 1;
246 }
247 let human_id = ctx
248 .humanizer
249 .humanize_id(*gid)
250 .unwrap_or_else(|| gid.to_string());
251 write!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
252 if let Some(literal) = literal_constraint {
253 let value = mode.expr(literal, None);
254 writeln!(f, " [value={}]", value)?;
255 } else {
256 writeln!(f, "")?;
257 }
258 ctx.as_mut().reset();
259 Ok(())
260 }
261 }?;
262 Ok(())
263 }
264}
265
266#[derive(Debug)]
267pub struct PlannedPeek {
268 pub plan: PeekPlan,
269 pub determination: TimestampDetermination<mz_repr::Timestamp>,
270 pub conn_id: ConnectionId,
271 pub source_arity: usize,
272 pub source_ids: BTreeSet<GlobalId>,
273}
274
275#[derive(Clone, Debug)]
277pub enum PeekPlan<T = mz_repr::Timestamp> {
278 FastPath(FastPathPlan),
279 SlowPath(PeekDataflowPlan<T>),
281}
282
283fn mfp_to_safe_plan(
286 mfp: mz_expr::MapFilterProject,
287) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
288 mfp.into_plan()
289 .map_err(OptimizerError::Internal)?
290 .into_nontemporal()
291 .map_err(|_e| OptimizerError::UnsafeMfpPlan)
292}
293
294fn permute_oneshot_mfp_around_index(
295 mfp: mz_expr::MapFilterProject,
296 key: &[MirScalarExpr],
297) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
298 let input_arity = mfp.input_arity;
299 let mut safe_mfp = mfp_to_safe_plan(mfp)?;
300 let (permute, thinning) = mz_expr::permutation_for_arrangement(key, input_arity);
301 safe_mfp.permute_fn(|c| permute[c], key.len() + thinning.len());
302 Ok(safe_mfp)
303}
304
305pub fn create_fast_path_plan<T: Timestamp>(
311 dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
312 view_id: GlobalId,
313 finishing: Option<&RowSetFinishing>,
314 persist_fast_path_limit: usize,
315 persist_fast_path_order: bool,
316) -> Result<Option<FastPathPlan>, OptimizerError> {
317 if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
323 {
324 let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
325 if let Some((rows, found_typ)) = mir.as_const() {
326 return Ok(Some(FastPathPlan::Constant(
328 rows.clone()
329 .map(|rows| rows.into_iter().map(|(row, diff)| (row, diff)).collect()),
330 found_typ.clone(),
331 )));
332 } else {
333 if let MirRelationExpr::TopK {
336 input,
337 group_key,
338 order_key,
339 limit,
340 offset,
341 monotonic: _,
342 expected_group_size: _,
343 } = mir
344 {
345 if let Some(finishing) = finishing {
346 if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
347 let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
349 (None, _) => true,
350 (Some(..), None) => false,
351 (Some(topk_limit), Some(finishing_limit)) => {
352 if let Some(l) = topk_limit.as_literal_int64() {
353 l >= *finishing_limit
354 } else {
355 false
356 }
357 }
358 };
359 if finishing_limits_at_least_as_topk {
360 mir = input;
361 }
362 }
363 }
364 }
365 let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
369 match mir {
370 MirRelationExpr::Get {
371 id: Id::Global(get_id),
372 typ: relation_typ,
373 ..
374 } => {
375 for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
377 if desc.on_id == *get_id {
378 return Ok(Some(FastPathPlan::PeekExisting(
379 *get_id,
380 *index_id,
381 None,
382 permute_oneshot_mfp_around_index(mfp, &desc.key)?,
383 )));
384 }
385 }
386
387 let safe_mfp = mfp_to_safe_plan(mfp)?;
391 let (_maps, filters, projection) = safe_mfp.as_map_filter_project();
392
393 let literal_constraint = if persist_fast_path_order {
394 let mut row = Row::default();
395 let mut packer = row.packer();
396 for (idx, col) in relation_typ.column_types.iter().enumerate() {
397 if !preserves_order(&col.scalar_type) {
398 break;
399 }
400 let col_expr = MirScalarExpr::Column(idx);
401
402 let Some((literal, _)) = filters
403 .iter()
404 .filter_map(|f| f.expr_eq_literal(&col_expr))
405 .next()
406 else {
407 break;
408 };
409 packer.extend_by_row(&literal);
410 }
411 if row.is_empty() { None } else { Some(row) }
412 } else {
413 None
414 };
415
416 let finish_ok = match &finishing {
417 None => false,
418 Some(RowSetFinishing {
419 order_by,
420 limit,
421 offset,
422 ..
423 }) => {
424 let order_ok = if persist_fast_path_order {
425 order_by.iter().enumerate().all(|(idx, order)| {
426 let column_idx = projection[order.column];
429 if column_idx >= safe_mfp.input_arity {
430 return false;
431 }
432 let column_type = &relation_typ.column_types[column_idx];
433 let index_ok = idx == column_idx;
434 let nulls_ok = !column_type.nullable || order.nulls_last;
435 let asc_ok = !order.desc;
436 let type_ok = preserves_order(&column_type.scalar_type);
437 index_ok && nulls_ok && asc_ok && type_ok
438 })
439 } else {
440 order_by.is_empty()
441 };
442 let limit_ok = limit.map_or(false, |l| {
443 usize::cast_from(l) + *offset < persist_fast_path_limit
444 });
445 order_ok && limit_ok
446 }
447 };
448
449 let key_constraint = if let Some(literal) = &literal_constraint {
450 let prefix_len = literal.iter().count();
451 relation_typ
452 .keys
453 .iter()
454 .any(|k| k.iter().all(|idx| *idx < prefix_len))
455 } else {
456 false
457 };
458
459 if key_constraint || (filters.is_empty() && finish_ok) {
463 return Ok(Some(FastPathPlan::PeekPersist(
464 *get_id,
465 literal_constraint,
466 safe_mfp,
467 )));
468 }
469 }
470 MirRelationExpr::Join { implementation, .. } => {
471 if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
472 implementation
473 {
474 return Ok(Some(FastPathPlan::PeekExisting(
475 *coll_id,
476 *idx_id,
477 Some(vals.clone()),
478 permute_oneshot_mfp_around_index(mfp, key)?,
479 )));
480 }
481 }
482 _ => {}
484 }
485 }
486 }
487 Ok(None)
488}
489
490impl FastPathPlan {
491 pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
492 match self {
493 FastPathPlan::Constant(..) => UsedIndexes::default(),
494 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
495 if literal_constraints.is_some() {
496 UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
497 } else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
498 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
499 } else {
500 UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
501 }
502 }
503 FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
504 }
505 }
506}
507
508impl crate::coord::Coordinator {
509 #[mz_ore::instrument(level = "debug")]
511 pub async fn implement_peek_plan(
512 &mut self,
513 ctx_extra: &mut ExecuteContextExtra,
514 plan: PlannedPeek,
515 finishing: RowSetFinishing,
516 compute_instance: ComputeInstanceId,
517 target_replica: Option<ReplicaId>,
518 max_result_size: u64,
519 max_returned_query_size: Option<u64>,
520 ) -> Result<crate::ExecuteResponse, AdapterError> {
521 let PlannedPeek {
522 plan: fast_path,
523 determination,
524 conn_id,
525 source_arity,
526 source_ids,
527 } = plan;
528
529 if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
531 let mut rows = match rows {
532 Ok(rows) => rows,
533 Err(e) => return Err(e.into()),
534 };
535 consolidate(&mut rows);
537
538 let mut results = Vec::new();
539 for (row, count) in rows {
540 if count.is_negative() {
541 Err(EvalError::InvalidParameterValue(
542 format!("Negative multiplicity in constant result: {}", count).into(),
543 ))?
544 };
545 if count.is_positive() {
546 let count = usize::cast_from(
547 u64::try_from(count.into_inner())
548 .expect("known to be positive from check above"),
549 );
550 results.push((
551 row,
552 NonZeroUsize::new(count).expect("known to be non-zero from check above"),
553 ));
554 }
555 }
556 let row_collection = RowCollection::new(results, &finishing.order_by);
557 let duration_histogram = self.metrics.row_set_finishing_seconds();
558
559 let (ret, reason) = match finishing.finish(
560 row_collection,
561 max_result_size,
562 max_returned_query_size,
563 &duration_histogram,
564 ) {
565 Ok((rows, row_size_bytes)) => {
566 let result_size = u64::cast_from(row_size_bytes);
567 let rows_returned = u64::cast_from(rows.count());
568 (
569 Ok(Self::send_immediate_rows(rows)),
570 StatementEndedExecutionReason::Success {
571 result_size: Some(result_size),
572 rows_returned: Some(rows_returned),
573 execution_strategy: Some(StatementExecutionStrategy::Constant),
574 },
575 )
576 }
577 Err(error) => (
578 Err(AdapterError::ResultSize(error.clone())),
579 StatementEndedExecutionReason::Errored { error },
580 ),
581 };
582 self.retire_execution(reason, std::mem::take(ctx_extra));
583 return ret;
584 }
585
586 let timestamp = determination.timestamp_context.timestamp_or_default();
587 if let Some(id) = ctx_extra.contents() {
588 self.set_statement_execution_timestamp(id, timestamp)
589 }
590
591 let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
598 PeekPlan::FastPath(FastPathPlan::PeekExisting(
599 _coll_id,
600 idx_id,
601 literal_constraints,
602 map_filter_project,
603 )) => (
604 (literal_constraints, timestamp, map_filter_project),
605 None,
606 true,
607 PeekTarget::Index { id: idx_id },
608 StatementExecutionStrategy::FastPath,
609 ),
610 PeekPlan::FastPath(FastPathPlan::PeekPersist(
611 coll_id,
612 literal_constraint,
613 map_filter_project,
614 )) => {
615 let peek_command = (
616 literal_constraint.map(|r| vec![r]),
617 timestamp,
618 map_filter_project,
619 );
620 let metadata = self
621 .controller
622 .storage
623 .collection_metadata(coll_id)
624 .expect("storage collection for fast-path peek")
625 .clone();
626 (
627 peek_command,
628 None,
629 true,
630 PeekTarget::Persist {
631 id: coll_id,
632 metadata,
633 },
634 StatementExecutionStrategy::PersistFastPath,
635 )
636 }
637 PeekPlan::SlowPath(PeekDataflowPlan {
638 desc: dataflow,
639 id: index_id,
643 key: index_key,
644 permutation: index_permutation,
645 thinned_arity: index_thinned_arity,
646 }) => {
647 let output_ids = dataflow.export_ids().collect();
648
649 self.controller
651 .compute
652 .create_dataflow(compute_instance, dataflow, None)
653 .unwrap_or_terminate("cannot fail to create dataflows");
654 self.initialize_compute_read_policies(
655 output_ids,
656 compute_instance,
657 CompactionWindow::DisableCompaction,
659 )
660 .await;
661
662 let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
664 map_filter_project.permute_fn(
665 |c| index_permutation[c],
666 index_key.len() + index_thinned_arity,
667 );
668 let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
669 (
670 (None, timestamp, map_filter_project),
671 Some(index_id),
672 false,
673 PeekTarget::Index { id: index_id },
674 StatementExecutionStrategy::Standard,
675 )
676 }
677 _ => {
678 unreachable!()
679 }
680 };
681
682 let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
684
685 let mut uuid = Uuid::new_v4();
688 while self.pending_peeks.contains_key(&uuid) {
689 uuid = Uuid::new_v4();
690 }
691
692 self.pending_peeks.insert(
695 uuid,
696 PendingPeek {
697 conn_id: conn_id.clone(),
698 cluster_id: compute_instance,
699 depends_on: source_ids,
700 ctx_extra: std::mem::take(ctx_extra),
701 is_fast_path,
702 },
703 );
704 self.client_pending_peeks
705 .entry(conn_id)
706 .or_default()
707 .insert(uuid, compute_instance);
708 let (literal_constraints, timestamp, map_filter_project) = peek_command;
709
710 self.controller
711 .compute
712 .peek(
713 compute_instance,
714 peek_target,
715 literal_constraints,
716 uuid,
717 timestamp,
718 finishing.clone(),
719 map_filter_project,
720 target_replica,
721 rows_tx,
722 )
723 .unwrap_or_terminate("cannot fail to peek");
724 let duration_histogram = self.metrics.row_set_finishing_seconds();
725
726 let rows_rx = rows_rx.map_ok_or_else(
728 |e| PeekResponseUnary::Error(e.to_string()),
729 move |resp| match resp {
730 PeekResponse::Rows(rows) => {
731 match finishing.finish(
732 rows,
733 max_result_size,
734 max_returned_query_size,
735 &duration_histogram,
736 ) {
737 Ok((rows, _size_bytes)) => PeekResponseUnary::Rows(Box::new(rows)),
738 Err(e) => PeekResponseUnary::Error(e),
739 }
740 }
741 PeekResponse::Canceled => PeekResponseUnary::Canceled,
742 PeekResponse::Error(e) => PeekResponseUnary::Error(e),
743 },
744 );
745
746 if let Some(index_id) = drop_dataflow {
748 self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
749 self.drop_indexes(vec![(compute_instance, index_id)]);
750 }
751
752 Ok(crate::ExecuteResponse::SendingRows {
753 future: Box::pin(rows_rx),
754 instance_id: compute_instance,
755 strategy,
756 })
757 }
758
759 #[mz_ore::instrument(level = "debug")]
761 pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
762 if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
763 self.metrics
764 .canceled_peeks
765 .with_label_values(&[])
766 .inc_by(u64::cast_from(uuids.len()));
767
768 let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
769 for (uuid, compute_instance) in &uuids {
770 inverse.entry(*compute_instance).or_default().insert(*uuid);
771 }
772 for (compute_instance, uuids) in inverse {
773 for uuid in uuids {
778 let _ = self.controller.compute.cancel_peek(
779 compute_instance,
780 uuid,
781 PeekResponse::Canceled,
782 );
783 }
784 }
785
786 let peeks = uuids
787 .iter()
788 .filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
789 .collect::<Vec<_>>();
790 for peek in peeks {
791 self.retire_execution(StatementEndedExecutionReason::Canceled, peek.ctx_extra);
792 }
793 }
794 }
795
796 pub(crate) fn handle_peek_notification(
799 &mut self,
800 uuid: Uuid,
801 notification: PeekNotification,
802 otel_ctx: OpenTelemetryContext,
803 ) {
804 if let Some(PendingPeek {
807 conn_id: _,
808 cluster_id: _,
809 depends_on: _,
810 ctx_extra,
811 is_fast_path,
812 }) = self.remove_pending_peek(&uuid)
813 {
814 let reason = match notification {
815 PeekNotification::Success {
816 rows: num_rows,
817 result_size,
818 } => {
819 let strategy = if is_fast_path {
820 StatementExecutionStrategy::FastPath
821 } else {
822 StatementExecutionStrategy::Standard
823 };
824 StatementEndedExecutionReason::Success {
825 result_size: Some(result_size),
826 rows_returned: Some(num_rows),
827 execution_strategy: Some(strategy),
828 }
829 }
830 PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
831 PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
832 };
833 otel_ctx.attach_as_parent();
834 self.retire_execution(reason, ctx_extra);
835 }
836 }
839
840 pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
842 let pending_peek = self.pending_peeks.remove(uuid);
843 if let Some(pending_peek) = &pending_peek {
844 let uuids = self
845 .client_pending_peeks
846 .get_mut(&pending_peek.conn_id)
847 .expect("coord peek state is inconsistent");
848 uuids.remove(uuid);
849 if uuids.is_empty() {
850 self.client_pending_peeks.remove(&pending_peek.conn_id);
851 }
852 }
853 pending_peek
854 }
855
856 pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
860 where
861 I: IntoRowIterator,
862 I::Iter: Send + Sync + 'static,
863 {
864 let rows = Box::new(rows.into_row_iter());
865 ExecuteResponse::SendingRowsImmediate { rows }
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use mz_expr::func::IsNull;
872 use mz_expr::{MapFilterProject, UnaryFunc};
873 use mz_ore::str::Indent;
874 use mz_repr::explain::text::text_string_at;
875 use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
876 use mz_repr::{ColumnType, Datum, ScalarType};
877
878 use super::*;
879
880 #[mz_ore::test]
881 #[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
883 let typ = RelationType::new(vec![ColumnType {
884 scalar_type: ScalarType::String,
885 nullable: false,
886 }]);
887 let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
888 let no_lookup = FastPathPlan::PeekExisting(
889 GlobalId::User(8),
890 GlobalId::User(10),
891 None,
892 MapFilterProject::new(4)
893 .map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
894 .project([1, 4])
895 .into_plan()
896 .expect("invalid plan")
897 .into_nontemporal()
898 .expect("invalid nontemporal"),
899 );
900 let lookup = FastPathPlan::PeekExisting(
901 GlobalId::User(9),
902 GlobalId::User(11),
903 Some(vec![Row::pack(Some(Datum::Int32(5)))]),
904 MapFilterProject::new(3)
905 .filter(Some(
906 MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
907 ))
908 .into_plan()
909 .expect("invalid plan")
910 .into_nontemporal()
911 .expect("invalid nontemporal"),
912 );
913
914 let humanizer = DummyHumanizer;
915 let config = ExplainConfig {
916 redacted: false,
917 ..Default::default()
918 };
919 let ctx_gen = || {
920 let indent = Indent::default();
921 let annotations = BTreeMap::new();
922 PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
923 };
924
925 let constant_err_exp = "Error \"division by zero\"\n";
926 let no_lookup_exp = "Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
927 let lookup_exp =
928 "Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
929
930 assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
931 assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
932 assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
933
934 let mut constant_rows = vec![
935 (Row::pack(Some(Datum::String("hello"))), Diff::ONE),
936 (Row::pack(Some(Datum::String("world"))), 2.into()),
937 (Row::pack(Some(Datum::String("star"))), 500.into()),
938 ];
939 let constant_exp1 =
940 "Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
941 assert_eq!(
942 text_string_at(
943 &FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
944 ctx_gen
945 ),
946 constant_exp1
947 );
948 constant_rows
949 .extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), Diff::ONE)));
950 let constant_exp2 = "Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
951 \n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
952 \n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
953 \n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
954 \n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
955 assert_eq!(
956 text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
957 constant_exp2
958 );
959 }
960}