1use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use differential_dataflow::consolidation::consolidate;
14use mz_compute_client::controller::error::{CollectionMissing, InstanceMissing};
15use mz_compute_client::controller::instance_client::InstanceClient;
16use mz_compute_client::controller::instance_client::{AcquireReadHoldsError, InstanceShutDown};
17use mz_compute_client::protocol::command::PeekTarget;
18use mz_compute_types::ComputeInstanceId;
19use mz_expr::row::RowCollection;
20use mz_ore::cast::CastFrom;
21use mz_persist_client::PersistClient;
22use mz_repr::GlobalId;
23use mz_repr::Timestamp;
24use mz_repr::global_id::TransientIdGen;
25use mz_repr::{RelationDesc, Row};
26use mz_sql::optimizer_metrics::OptimizerMetrics;
27use mz_storage_types::sources::Timeline;
28use mz_timestamp_oracle::TimestampOracle;
29use prometheus::Histogram;
30use thiserror::Error;
31use timely::progress::Antichain;
32use tokio::sync::oneshot;
33use uuid::Uuid;
34
35use crate::catalog::Catalog;
36use crate::command::{CatalogSnapshot, Command};
37use crate::coord::Coordinator;
38use crate::coord::peek::FastPathPlan;
39use crate::statement_logging::WatchSetCreation;
40use crate::statement_logging::{
41 FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend,
42 StatementLoggingId,
43};
44use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging};
45
46pub type StorageCollectionsHandle = Arc<
48 dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = Timestamp>
49 + Send
50 + Sync,
51>;
52
53#[derive(Debug)]
55pub struct PeekClient {
56 coordinator_client: Client,
57 compute_instances: BTreeMap<ComputeInstanceId, InstanceClient<Timestamp>>,
62 pub storage_collections: StorageCollectionsHandle,
64 pub transient_id_gen: Arc<TransientIdGen>,
66 pub optimizer_metrics: OptimizerMetrics,
67 oracles: BTreeMap<Timeline, Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
69 persist_client: PersistClient,
70 pub statement_logging_frontend: StatementLoggingFrontend,
72}
73
74impl PeekClient {
75 pub fn new(
77 coordinator_client: Client,
78 storage_collections: StorageCollectionsHandle,
79 transient_id_gen: Arc<TransientIdGen>,
80 optimizer_metrics: OptimizerMetrics,
81 persist_client: PersistClient,
82 statement_logging_frontend: StatementLoggingFrontend,
83 ) -> Self {
84 Self {
85 coordinator_client,
86 compute_instances: Default::default(), storage_collections,
88 transient_id_gen,
89 optimizer_metrics,
90 statement_logging_frontend,
91 oracles: Default::default(), persist_client,
93 }
94 }
95
96 pub async fn ensure_compute_instance_client(
97 &mut self,
98 compute_instance: ComputeInstanceId,
99 ) -> Result<InstanceClient<Timestamp>, InstanceMissing> {
100 if !self.compute_instances.contains_key(&compute_instance) {
101 let client = self
102 .call_coordinator(|tx| Command::GetComputeInstanceClient {
103 instance_id: compute_instance,
104 tx,
105 })
106 .await?;
107 self.compute_instances.insert(compute_instance, client);
108 }
109 Ok(self
110 .compute_instances
111 .get(&compute_instance)
112 .expect("ensured above")
113 .clone())
114 }
115
116 pub async fn ensure_oracle(
117 &mut self,
118 timeline: Timeline,
119 ) -> Result<&mut Arc<dyn TimestampOracle<Timestamp> + Send + Sync>, AdapterError> {
120 if !self.oracles.contains_key(&timeline) {
121 let oracle = self
122 .call_coordinator(|tx| Command::GetOracle {
123 timeline: timeline.clone(),
124 tx,
125 })
126 .await?;
127 self.oracles.insert(timeline.clone(), oracle);
128 }
129 Ok(self.oracles.get_mut(&timeline).expect("ensured above"))
130 }
131
132 pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
135 let start = std::time::Instant::now();
136 let CatalogSnapshot { catalog } = self
137 .call_coordinator(|tx| Command::CatalogSnapshot { tx })
138 .await;
139 self.coordinator_client
140 .metrics()
141 .catalog_snapshot_seconds
142 .with_label_values(&[context])
143 .observe(start.elapsed().as_secs_f64());
144 catalog
145 }
146
147 pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
148 where
149 F: FnOnce(oneshot::Sender<T>) -> Command,
150 {
151 let (tx, rx) = oneshot::channel();
152 self.coordinator_client.send(f(tx));
153 rx.await
154 .expect("if the coordinator is still alive, it shouldn't have dropped our call")
155 }
156
157 pub async fn acquire_read_holds_and_least_valid_write(
170 &mut self,
171 id_bundle: &CollectionIdBundle,
172 ) -> Result<(ReadHolds<Timestamp>, Antichain<Timestamp>), CollectionLookupError> {
173 let mut read_holds = ReadHolds::new();
174 let mut upper = Antichain::new();
175
176 if !id_bundle.storage_ids.is_empty() {
177 let desired_storage: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
178 let storage_read_holds = self
179 .storage_collections
180 .acquire_read_holds(desired_storage)?;
181 read_holds.storage_holds = storage_read_holds
182 .into_iter()
183 .map(|hold| (hold.id(), hold))
184 .collect();
185
186 let storage_ids: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
187 for f in self
188 .storage_collections
189 .collections_frontiers(storage_ids)?
190 {
191 upper.extend(f.write_frontier);
192 }
193 }
194
195 for (&instance_id, collection_ids) in &id_bundle.compute_ids {
196 let client = self.ensure_compute_instance_client(instance_id).await?;
197
198 for (id, read_hold, write_frontier) in client
199 .acquire_read_holds_and_collection_write_frontiers(
200 collection_ids.iter().copied().collect(),
201 )
202 .await?
203 {
204 let prev = read_holds
205 .compute_holds
206 .insert((instance_id, id), read_hold);
207 assert!(
208 prev.is_none(),
209 "duplicate compute ID in id_bundle {id_bundle:?}"
210 );
211
212 upper.extend(write_frontier);
213 }
214 }
215
216 Ok((read_holds, upper))
217 }
218
219 pub async fn implement_fast_path_peek_plan(
228 &mut self,
229 fast_path: FastPathPlan,
230 timestamp: Timestamp,
231 finishing: mz_expr::RowSetFinishing,
232 compute_instance: ComputeInstanceId,
233 target_replica: Option<mz_cluster_client::ReplicaId>,
234 intermediate_result_type: mz_repr::SqlRelationType,
235 max_result_size: u64,
236 max_returned_query_size: Option<u64>,
237 row_set_finishing_seconds: Histogram,
238 input_read_holds: ReadHolds<Timestamp>,
239 peek_stash_read_batch_size_bytes: usize,
240 peek_stash_read_memory_budget_bytes: usize,
241 conn_id: mz_adapter_types::connection::ConnectionId,
242 depends_on: std::collections::BTreeSet<mz_repr::GlobalId>,
243 watch_set: Option<WatchSetCreation>,
244 ) -> Result<crate::ExecuteResponse, AdapterError> {
245 if let FastPathPlan::Constant(rows_res, _) = fast_path {
247 if let Some(ref ws) = watch_set {
250 self.log_lifecycle_event(
251 ws.logging_id,
252 statement_logging::StatementLifecycleEvent::StorageDependenciesFinished,
253 );
254 self.log_lifecycle_event(
255 ws.logging_id,
256 statement_logging::StatementLifecycleEvent::ComputeDependenciesFinished,
257 );
258 }
259
260 let mut rows = match rows_res {
261 Ok(rows) => rows,
262 Err(e) => return Err(e.into()),
263 };
264 consolidate(&mut rows);
265
266 let mut results = Vec::new();
267 for (row, count) in rows {
268 let count = match u64::try_from(count.into_inner()) {
269 Ok(u) => usize::cast_from(u),
270 Err(_) => {
271 return Err(AdapterError::Unstructured(anyhow::anyhow!(
272 "Negative multiplicity in constant result: {}",
273 count
274 )));
275 }
276 };
277 match std::num::NonZeroUsize::new(count) {
278 Some(nzu) => {
279 results.push((row, nzu));
280 }
281 None => {
282 }
284 };
285 }
286 let row_collection = RowCollection::new(results, &finishing.order_by);
287 return match finishing.finish(
288 row_collection,
289 max_result_size,
290 max_returned_query_size,
291 &row_set_finishing_seconds,
292 ) {
293 Ok((rows, _bytes)) => Ok(Coordinator::send_immediate_rows(rows)),
294 Err(e) => Err(AdapterError::ResultSize(e)),
296 };
297 }
298
299 let (peek_target, target_read_hold, literal_constraints, mfp, strategy) = match fast_path {
300 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, mfp) => {
301 let peek_target = PeekTarget::Index { id: idx_id };
302 let target_read_hold = input_read_holds
303 .compute_holds
304 .get(&(compute_instance, idx_id))
305 .expect("missing compute read hold on PeekExisting peek target")
306 .clone();
307 let strategy = statement_logging::StatementExecutionStrategy::FastPath;
308 (
309 peek_target,
310 target_read_hold,
311 literal_constraints,
312 mfp,
313 strategy,
314 )
315 }
316 FastPathPlan::PeekPersist(coll_id, literal_constraint, mfp) => {
317 let literal_constraints = literal_constraint.map(|r| vec![r]);
318 let metadata = self
319 .storage_collections
320 .collection_metadata(coll_id)
321 .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
322 .clone();
323 let peek_target = PeekTarget::Persist {
324 id: coll_id,
325 metadata,
326 };
327 let target_read_hold = input_read_holds
328 .storage_holds
329 .get(&coll_id)
330 .expect("missing storage read hold on PeekPersist peek target")
331 .clone();
332 let strategy = statement_logging::StatementExecutionStrategy::PersistFastPath;
333 (
334 peek_target,
335 target_read_hold,
336 literal_constraints,
337 mfp,
338 strategy,
339 )
340 }
341 _ => {
342 unreachable!()
344 }
345 };
346
347 let (rows_tx, rows_rx) = oneshot::channel();
348 let uuid = Uuid::new_v4();
349
350 let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
353 let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols);
354
355 let client = self
356 .ensure_compute_instance_client(compute_instance)
357 .await
358 .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?;
359
360 self.call_coordinator(|tx| Command::RegisterFrontendPeek {
365 uuid,
366 conn_id: conn_id.clone(),
367 cluster_id: compute_instance,
368 depends_on,
369 is_fast_path: true,
370 watch_set,
371 tx,
372 })
373 .await?;
374
375 let finishing_for_instance = finishing.clone();
376 let peek_result = client
377 .peek(
378 peek_target,
379 literal_constraints,
380 uuid,
381 timestamp,
382 result_desc,
383 finishing_for_instance,
384 mfp,
385 target_read_hold,
386 target_replica,
387 rows_tx,
388 )
389 .await;
390
391 if let Err(err) = peek_result {
392 self.call_coordinator(|tx| Command::UnregisterFrontendPeek { uuid, tx })
395 .await;
396 return Err(
397 AdapterError::concurrent_dependency_drop_from_instance_peek_error(
398 err,
399 compute_instance,
400 ),
401 );
402 }
403
404 let peek_response_stream = Coordinator::create_peek_response_stream(
405 rows_rx,
406 finishing,
407 max_result_size,
408 max_returned_query_size,
409 row_set_finishing_seconds,
410 self.persist_client.clone(),
411 peek_stash_read_batch_size_bytes,
412 peek_stash_read_memory_budget_bytes,
413 );
414
415 Ok(crate::ExecuteResponse::SendingRowsStreaming {
416 rows: Box::pin(peek_response_stream),
417 instance_id: compute_instance,
418 strategy,
419 })
420 }
421
422 pub(crate) fn log_began_execution(
426 &self,
427 record: statement_logging::StatementBeganExecutionRecord,
428 mseh_update: Row,
429 prepared_statement: Option<PreparedStatementEvent>,
430 ) {
431 self.coordinator_client
432 .send(Command::FrontendStatementLogging(
433 FrontendStatementLoggingEvent::BeganExecution {
434 record,
435 mseh_update,
436 prepared_statement,
437 },
438 ));
439 }
440
441 pub(crate) fn log_set_cluster(
443 &self,
444 id: StatementLoggingId,
445 cluster_id: mz_controller_types::ClusterId,
446 ) {
447 self.coordinator_client
448 .send(Command::FrontendStatementLogging(
449 FrontendStatementLoggingEvent::SetCluster { id, cluster_id },
450 ));
451 }
452
453 pub(crate) fn log_set_timestamp(&self, id: StatementLoggingId, timestamp: mz_repr::Timestamp) {
455 self.coordinator_client
456 .send(Command::FrontendStatementLogging(
457 FrontendStatementLoggingEvent::SetTimestamp { id, timestamp },
458 ));
459 }
460
461 pub(crate) fn log_set_transient_index_id(
463 &self,
464 id: StatementLoggingId,
465 transient_index_id: mz_repr::GlobalId,
466 ) {
467 self.coordinator_client
468 .send(Command::FrontendStatementLogging(
469 FrontendStatementLoggingEvent::SetTransientIndex {
470 id,
471 transient_index_id,
472 },
473 ));
474 }
475
476 pub(crate) fn log_lifecycle_event(
478 &self,
479 id: StatementLoggingId,
480 event: statement_logging::StatementLifecycleEvent,
481 ) {
482 let when = (self.statement_logging_frontend.now)();
483 self.coordinator_client
484 .send(Command::FrontendStatementLogging(
485 FrontendStatementLoggingEvent::Lifecycle { id, event, when },
486 ));
487 }
488
489 pub(crate) fn log_ended_execution(
491 &self,
492 id: StatementLoggingId,
493 reason: statement_logging::StatementEndedExecutionReason,
494 ) {
495 let ended_at = (self.statement_logging_frontend.now)();
496 let record = statement_logging::StatementEndedExecutionRecord {
497 id: id.0,
498 reason,
499 ended_at,
500 };
501 self.coordinator_client
502 .send(Command::FrontendStatementLogging(
503 FrontendStatementLoggingEvent::EndedExecution(record),
504 ));
505 }
506}
507
508#[derive(Error, Debug)]
510pub enum CollectionLookupError {
511 #[error("instance does not exist: {0}")]
513 InstanceMissing(ComputeInstanceId),
514 #[error("the instance has shut down")]
516 InstanceShutDown,
517 #[error("collection does not exist: {0}")]
519 CollectionMissing(GlobalId),
520}
521
522impl From<InstanceMissing> for CollectionLookupError {
523 fn from(error: InstanceMissing) -> Self {
524 Self::InstanceMissing(error.0)
525 }
526}
527
528impl From<InstanceShutDown> for CollectionLookupError {
529 fn from(_error: InstanceShutDown) -> Self {
530 Self::InstanceShutDown
531 }
532}
533
534impl From<CollectionMissing> for CollectionLookupError {
535 fn from(error: CollectionMissing) -> Self {
536 Self::CollectionMissing(error.0)
537 }
538}
539
540impl From<AcquireReadHoldsError> for CollectionLookupError {
541 fn from(error: AcquireReadHoldsError) -> Self {
542 match error {
543 AcquireReadHoldsError::CollectionMissing(id) => Self::CollectionMissing(id),
544 AcquireReadHoldsError::InstanceShutDown => Self::InstanceShutDown,
545 }
546 }
547}