1use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use differential_dataflow::consolidation::consolidate;
14use mz_compute_client::controller::error::{CollectionMissing, InstanceMissing};
15use mz_compute_client::controller::instance_client::InstanceClient;
16use mz_compute_client::controller::instance_client::{AcquireReadHoldsError, InstanceShutDown};
17use mz_compute_client::protocol::command::PeekTarget;
18use mz_compute_types::ComputeInstanceId;
19use mz_expr::row::RowCollection;
20use mz_ore::cast::CastFrom;
21use mz_persist_client::PersistClient;
22use mz_repr::GlobalId;
23use mz_repr::Timestamp;
24use mz_repr::global_id::TransientIdGen;
25use mz_repr::{RelationDesc, Row};
26use mz_sql::optimizer_metrics::OptimizerMetrics;
27use mz_sql::plan::Params;
28use mz_storage_types::sources::Timeline;
29use mz_timestamp_oracle::TimestampOracle;
30use prometheus::Histogram;
31use qcell::QCell;
32use thiserror::Error;
33use timely::progress::Antichain;
34use tokio::sync::oneshot;
35use uuid::Uuid;
36
37use crate::catalog::Catalog;
38use crate::command::{CatalogSnapshot, Command};
39use crate::coord::peek::FastPathPlan;
40use crate::coord::{Coordinator, ExecuteContextGuard};
41use crate::session::{LifecycleTimestamps, Session};
42use crate::statement_logging::{
43 FrontendStatementLoggingEvent, PreparedStatementEvent, PreparedStatementLoggingInfo,
44 StatementLoggingFrontend, StatementLoggingId, WatchSetCreation,
45};
46use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging};
47
48pub type StorageCollectionsHandle =
50 Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>;
51
52#[derive(Debug)]
54pub struct PeekClient {
55 coordinator_client: Client,
56 compute_instances: BTreeMap<ComputeInstanceId, InstanceClient>,
61 pub storage_collections: StorageCollectionsHandle,
63 pub transient_id_gen: Arc<TransientIdGen>,
65 pub optimizer_metrics: OptimizerMetrics,
66 oracles: BTreeMap<Timeline, Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
68 persist_client: PersistClient,
69 pub statement_logging_frontend: StatementLoggingFrontend,
71}
72
73impl PeekClient {
74 pub fn new(
76 coordinator_client: Client,
77 storage_collections: StorageCollectionsHandle,
78 transient_id_gen: Arc<TransientIdGen>,
79 optimizer_metrics: OptimizerMetrics,
80 persist_client: PersistClient,
81 statement_logging_frontend: StatementLoggingFrontend,
82 ) -> Self {
83 Self {
84 coordinator_client,
85 compute_instances: Default::default(), storage_collections,
87 transient_id_gen,
88 optimizer_metrics,
89 statement_logging_frontend,
90 oracles: Default::default(), persist_client,
92 }
93 }
94
95 pub async fn ensure_compute_instance_client(
96 &mut self,
97 compute_instance: ComputeInstanceId,
98 ) -> Result<InstanceClient, InstanceMissing> {
99 if !self.compute_instances.contains_key(&compute_instance) {
100 let client = self
101 .call_coordinator(|tx| Command::GetComputeInstanceClient {
102 instance_id: compute_instance,
103 tx,
104 })
105 .await?;
106 self.compute_instances.insert(compute_instance, client);
107 }
108 Ok(self
109 .compute_instances
110 .get(&compute_instance)
111 .expect("ensured above")
112 .clone())
113 }
114
115 pub async fn ensure_oracle(
116 &mut self,
117 timeline: Timeline,
118 ) -> Result<&mut Arc<dyn TimestampOracle<Timestamp> + Send + Sync>, AdapterError> {
119 if !self.oracles.contains_key(&timeline) {
120 let oracle = self
121 .call_coordinator(|tx| Command::GetOracle {
122 timeline: timeline.clone(),
123 tx,
124 })
125 .await?;
126 self.oracles.insert(timeline.clone(), oracle);
127 }
128 Ok(self.oracles.get_mut(&timeline).expect("ensured above"))
129 }
130
131 pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
134 let start = std::time::Instant::now();
135 let CatalogSnapshot { catalog } = self
136 .call_coordinator(|tx| Command::CatalogSnapshot { tx })
137 .await;
138 self.coordinator_client
139 .metrics()
140 .catalog_snapshot_seconds
141 .with_label_values(&[context])
142 .observe(start.elapsed().as_secs_f64());
143 catalog
144 }
145
146 pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
147 where
148 F: FnOnce(oneshot::Sender<T>) -> Command,
149 {
150 let (tx, rx) = oneshot::channel();
151 self.coordinator_client.send(f(tx));
152 rx.await
153 .expect("if the coordinator is still alive, it shouldn't have dropped our call")
154 }
155
156 pub async fn acquire_read_holds_and_least_valid_write(
169 &mut self,
170 id_bundle: &CollectionIdBundle,
171 ) -> Result<(ReadHolds, Antichain<Timestamp>), CollectionLookupError> {
172 let mut read_holds = ReadHolds::new();
173 let mut upper = Antichain::new();
174
175 if !id_bundle.storage_ids.is_empty() {
176 let desired_storage: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
177 let storage_read_holds = self
178 .storage_collections
179 .acquire_read_holds(desired_storage)?;
180 read_holds.storage_holds = storage_read_holds
181 .into_iter()
182 .map(|hold| (hold.id(), hold))
183 .collect();
184
185 let storage_ids: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
186 for f in self
187 .storage_collections
188 .collections_frontiers(storage_ids)?
189 {
190 upper.extend(f.write_frontier);
191 }
192 }
193
194 for (&instance_id, collection_ids) in &id_bundle.compute_ids {
195 let client = self.ensure_compute_instance_client(instance_id).await?;
196
197 for (id, read_hold, write_frontier) in client
198 .acquire_read_holds_and_collection_write_frontiers(
199 collection_ids.iter().copied().collect(),
200 )
201 .await?
202 {
203 let prev = read_holds
204 .compute_holds
205 .insert((instance_id, id), read_hold);
206 assert!(
207 prev.is_none(),
208 "duplicate compute ID in id_bundle {id_bundle:?}"
209 );
210
211 upper.extend(write_frontier);
212 }
213 }
214
215 Ok((read_holds, upper))
216 }
217
218 pub async fn implement_fast_path_peek_plan(
227 &mut self,
228 fast_path: FastPathPlan,
229 timestamp: Timestamp,
230 finishing: mz_expr::RowSetFinishing,
231 compute_instance: ComputeInstanceId,
232 target_replica: Option<mz_cluster_client::ReplicaId>,
233 intermediate_result_type: mz_repr::SqlRelationType,
234 max_result_size: u64,
235 max_returned_query_size: Option<u64>,
236 row_set_finishing_seconds: Histogram,
237 input_read_holds: ReadHolds,
238 peek_stash_read_batch_size_bytes: usize,
239 peek_stash_read_memory_budget_bytes: usize,
240 conn_id: mz_adapter_types::connection::ConnectionId,
241 depends_on: std::collections::BTreeSet<mz_repr::GlobalId>,
242 watch_set: Option<WatchSetCreation>,
243 ) -> Result<crate::ExecuteResponse, AdapterError> {
244 if let FastPathPlan::Constant(rows_res, _) = fast_path {
246 if let Some(ref ws) = watch_set {
249 self.log_lifecycle_event(
250 ws.logging_id,
251 statement_logging::StatementLifecycleEvent::StorageDependenciesFinished,
252 );
253 self.log_lifecycle_event(
254 ws.logging_id,
255 statement_logging::StatementLifecycleEvent::ComputeDependenciesFinished,
256 );
257 }
258
259 let mut rows = match rows_res {
260 Ok(rows) => rows,
261 Err(e) => return Err(e.into()),
262 };
263 consolidate(&mut rows);
264
265 let mut results = Vec::new();
266 for (row, count) in rows {
267 let count = match u64::try_from(count.into_inner()) {
268 Ok(u) => usize::cast_from(u),
269 Err(_) => {
270 return Err(AdapterError::Unstructured(anyhow::anyhow!(
271 "Negative multiplicity in constant result: {}",
272 count
273 )));
274 }
275 };
276 match std::num::NonZeroUsize::new(count) {
277 Some(nzu) => {
278 results.push((row, nzu));
279 }
280 None => {
281 }
283 };
284 }
285 let row_collection = RowCollection::new(results, &finishing.order_by);
286 return match finishing.finish(
287 row_collection,
288 max_result_size,
289 max_returned_query_size,
290 &row_set_finishing_seconds,
291 ) {
292 Ok((rows, _bytes)) => Ok(Coordinator::send_immediate_rows(rows)),
293 Err(e) => Err(AdapterError::ResultSize(e)),
295 };
296 }
297
298 let (peek_target, target_read_hold, literal_constraints, mfp, strategy) = match fast_path {
299 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, mfp) => {
300 let peek_target = PeekTarget::Index { id: idx_id };
301 let target_read_hold = input_read_holds
302 .compute_holds
303 .get(&(compute_instance, idx_id))
304 .expect("missing compute read hold on PeekExisting peek target")
305 .clone();
306 let strategy = statement_logging::StatementExecutionStrategy::FastPath;
307 (
308 peek_target,
309 target_read_hold,
310 literal_constraints,
311 mfp,
312 strategy,
313 )
314 }
315 FastPathPlan::PeekPersist(coll_id, literal_constraint, mfp) => {
316 let literal_constraints = literal_constraint.map(|r| vec![r]);
317 let metadata = self
318 .storage_collections
319 .collection_metadata(coll_id)
320 .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
321 .clone();
322 let peek_target = PeekTarget::Persist {
323 id: coll_id,
324 metadata,
325 };
326 let target_read_hold = input_read_holds
327 .storage_holds
328 .get(&coll_id)
329 .expect("missing storage read hold on PeekPersist peek target")
330 .clone();
331 let strategy = statement_logging::StatementExecutionStrategy::PersistFastPath;
332 (
333 peek_target,
334 target_read_hold,
335 literal_constraints,
336 mfp,
337 strategy,
338 )
339 }
340 FastPathPlan::Constant(..) => {
341 unreachable!()
343 }
344 };
345
346 let (rows_tx, rows_rx) = oneshot::channel();
347 let uuid = Uuid::new_v4();
348
349 let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
352 let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols);
353
354 let client = self
355 .ensure_compute_instance_client(compute_instance)
356 .await
357 .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?;
358
359 self.call_coordinator(|tx| Command::RegisterFrontendPeek {
364 uuid,
365 conn_id: conn_id.clone(),
366 cluster_id: compute_instance,
367 depends_on,
368 is_fast_path: true,
369 watch_set,
370 tx,
371 })
372 .await?;
373
374 let finishing_for_instance = finishing.clone();
375 let peek_result = client
376 .peek(
377 peek_target,
378 literal_constraints,
379 uuid,
380 timestamp,
381 result_desc,
382 finishing_for_instance,
383 mfp,
384 target_read_hold,
385 target_replica,
386 rows_tx,
387 )
388 .await;
389
390 if let Err(err) = peek_result {
391 self.call_coordinator(|tx| Command::UnregisterFrontendPeek { uuid, tx })
394 .await;
395 return Err(
396 AdapterError::concurrent_dependency_drop_from_instance_peek_error(
397 err,
398 compute_instance,
399 ),
400 );
401 }
402
403 let peek_response_stream = Coordinator::create_peek_response_stream(
404 rows_rx,
405 finishing,
406 max_result_size,
407 max_returned_query_size,
408 row_set_finishing_seconds,
409 self.persist_client.clone(),
410 peek_stash_read_batch_size_bytes,
411 peek_stash_read_memory_budget_bytes,
412 );
413
414 Ok(crate::ExecuteResponse::SendingRowsStreaming {
415 rows: Box::pin(peek_response_stream),
416 instance_id: compute_instance,
417 strategy,
418 })
419 }
420
421 pub(crate) fn begin_statement_logging(
433 &self,
434 session: &mut Session,
435 params: &Params,
436 logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
437 catalog: &Catalog,
438 lifecycle_timestamps: Option<LifecycleTimestamps>,
439 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
440 ) -> StatementLoggingGuard {
441 let id = if outer_ctx_extra.is_none() {
442 let result = self.statement_logging_frontend.begin_statement_execution(
444 session,
445 params,
446 logging,
447 catalog.system_config(),
448 lifecycle_timestamps,
449 );
450
451 if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
452 self.log_began_execution(began_execution, mseh_update, prepared_statement);
453 Some(logging_id)
454 } else {
455 None
456 }
457 } else {
458 outer_ctx_extra
462 .take()
463 .and_then(|guard| guard.defuse().retire())
464 };
465
466 StatementLoggingGuard {
467 id,
468 coordinator_client: self.coordinator_client.clone(),
469 now: self.statement_logging_frontend.now.clone(),
470 }
471 }
472
473 pub(crate) fn log_began_execution(
475 &self,
476 record: statement_logging::StatementBeganExecutionRecord,
477 mseh_update: Row,
478 prepared_statement: Option<PreparedStatementEvent>,
479 ) {
480 self.coordinator_client
481 .send(Command::FrontendStatementLogging(
482 FrontendStatementLoggingEvent::BeganExecution {
483 record,
484 mseh_update,
485 prepared_statement,
486 },
487 ));
488 }
489
490 pub(crate) fn log_set_cluster(
492 &self,
493 id: StatementLoggingId,
494 cluster_id: mz_controller_types::ClusterId,
495 ) {
496 self.coordinator_client
497 .send(Command::FrontendStatementLogging(
498 FrontendStatementLoggingEvent::SetCluster { id, cluster_id },
499 ));
500 }
501
502 pub(crate) fn log_set_timestamp(&self, id: StatementLoggingId, timestamp: mz_repr::Timestamp) {
504 self.coordinator_client
505 .send(Command::FrontendStatementLogging(
506 FrontendStatementLoggingEvent::SetTimestamp { id, timestamp },
507 ));
508 }
509
510 pub(crate) fn log_set_transient_index_id(
512 &self,
513 id: StatementLoggingId,
514 transient_index_id: mz_repr::GlobalId,
515 ) {
516 self.coordinator_client
517 .send(Command::FrontendStatementLogging(
518 FrontendStatementLoggingEvent::SetTransientIndex {
519 id,
520 transient_index_id,
521 },
522 ));
523 }
524
525 pub(crate) fn log_lifecycle_event(
527 &self,
528 id: StatementLoggingId,
529 event: statement_logging::StatementLifecycleEvent,
530 ) {
531 let when = (self.statement_logging_frontend.now)();
532 self.coordinator_client
533 .send(Command::FrontendStatementLogging(
534 FrontendStatementLoggingEvent::Lifecycle { id, event, when },
535 ));
536 }
537
538 pub(crate) fn log_ended_execution(
543 &self,
544 id: StatementLoggingId,
545 reason: statement_logging::StatementEndedExecutionReason,
546 ) {
547 let ended_at = (self.statement_logging_frontend.now)();
548 let record = statement_logging::StatementEndedExecutionRecord {
549 id: id.0,
550 reason,
551 ended_at,
552 };
553 self.coordinator_client
554 .send(Command::FrontendStatementLogging(
555 FrontendStatementLoggingEvent::EndedExecution(record),
556 ));
557 }
558}
559
560#[must_use = "StatementLoggingGuard must be explicitly retired or handed off; \
576 otherwise `Drop` will log the statement as Aborted"]
577pub(crate) struct StatementLoggingGuard {
578 id: Option<StatementLoggingId>,
580 coordinator_client: Client,
581 now: mz_ore::now::NowFn,
582}
583
584impl StatementLoggingGuard {
585 pub(crate) fn id(&self) -> Option<StatementLoggingId> {
587 self.id
588 }
589
590 pub(crate) fn defuse(mut self) {
594 self.id = None;
595 }
596
597 fn emit(&mut self, reason: statement_logging::StatementEndedExecutionReason) {
598 let Some(id) = self.id.take() else {
599 return;
600 };
601 let ended_at = (self.now)();
602 let record = statement_logging::StatementEndedExecutionRecord {
603 id: id.0,
604 reason,
605 ended_at,
606 };
607 self.coordinator_client
608 .send(Command::FrontendStatementLogging(
609 FrontendStatementLoggingEvent::EndedExecution(record),
610 ));
611 }
612}
613
614impl Drop for StatementLoggingGuard {
615 fn drop(&mut self) {
616 self.emit(statement_logging::StatementEndedExecutionReason::Aborted);
619 }
620}
621
622#[derive(Error, Debug)]
624pub enum CollectionLookupError {
625 #[error("instance does not exist: {0}")]
627 InstanceMissing(ComputeInstanceId),
628 #[error("the instance has shut down")]
630 InstanceShutDown,
631 #[error("collection does not exist: {0}")]
633 CollectionMissing(GlobalId),
634}
635
636impl From<InstanceMissing> for CollectionLookupError {
637 fn from(error: InstanceMissing) -> Self {
638 Self::InstanceMissing(error.0)
639 }
640}
641
642impl From<InstanceShutDown> for CollectionLookupError {
643 fn from(_error: InstanceShutDown) -> Self {
644 Self::InstanceShutDown
645 }
646}
647
648impl From<CollectionMissing> for CollectionLookupError {
649 fn from(error: CollectionMissing) -> Self {
650 Self::CollectionMissing(error.0)
651 }
652}
653
654impl From<AcquireReadHoldsError> for CollectionLookupError {
655 fn from(error: AcquireReadHoldsError) -> Self {
656 match error {
657 AcquireReadHoldsError::CollectionMissing(id) => Self::CollectionMissing(id),
658 AcquireReadHoldsError::InstanceShutDown => Self::InstanceShutDown,
659 }
660 }
661}