1use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use differential_dataflow::consolidation::consolidate;
14use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
15use mz_compute_client::protocol::command::PeekTarget;
16use mz_compute_types::ComputeInstanceId;
17use mz_expr::row::RowCollection;
18use mz_ore::cast::CastFrom;
19use mz_persist_client::PersistClient;
20use mz_repr::Timestamp;
21use mz_repr::global_id::TransientIdGen;
22use mz_repr::{RelationDesc, Row};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::TimestampOracle;
26use prometheus::Histogram;
27use timely::progress::Antichain;
28use tokio::sync::oneshot;
29use uuid::Uuid;
30
31use crate::catalog::Catalog;
32use crate::command::{CatalogSnapshot, Command};
33use crate::coord::Coordinator;
34use crate::coord::peek::FastPathPlan;
35use crate::statement_logging::WatchSetCreation;
36use crate::statement_logging::{
37 FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend,
38 StatementLoggingId,
39};
40use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging};
41
42pub type StorageCollectionsHandle = Arc<
44 dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = Timestamp>
45 + Send
46 + Sync,
47>;
48
49#[derive(Debug)]
51pub struct PeekClient {
52 coordinator_client: Client,
53 compute_instances:
58 BTreeMap<ComputeInstanceId, mz_compute_client::controller::instance::Client<Timestamp>>,
59 pub storage_collections: StorageCollectionsHandle,
61 pub transient_id_gen: Arc<TransientIdGen>,
63 pub optimizer_metrics: OptimizerMetrics,
64 oracles: BTreeMap<Timeline, Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
66 persist_client: PersistClient,
67 pub statement_logging_frontend: StatementLoggingFrontend,
69}
70
71impl PeekClient {
72 pub fn new(
74 coordinator_client: Client,
75 storage_collections: StorageCollectionsHandle,
76 transient_id_gen: Arc<TransientIdGen>,
77 optimizer_metrics: OptimizerMetrics,
78 persist_client: PersistClient,
79 statement_logging_frontend: StatementLoggingFrontend,
80 ) -> Self {
81 Self {
82 coordinator_client,
83 compute_instances: Default::default(), storage_collections,
85 transient_id_gen,
86 optimizer_metrics,
87 statement_logging_frontend,
88 oracles: Default::default(), persist_client,
90 }
91 }
92
93 pub async fn ensure_compute_instance_client(
94 &mut self,
95 compute_instance: ComputeInstanceId,
96 ) -> Result<mz_compute_client::controller::instance::Client<Timestamp>, InstanceMissing> {
97 if !self.compute_instances.contains_key(&compute_instance) {
98 let client = self
99 .call_coordinator(|tx| Command::GetComputeInstanceClient {
100 instance_id: compute_instance,
101 tx,
102 })
103 .await?;
104 self.compute_instances.insert(compute_instance, client);
105 }
106 Ok(self
107 .compute_instances
108 .get(&compute_instance)
109 .expect("ensured above")
110 .clone())
111 }
112
113 pub async fn ensure_oracle(
114 &mut self,
115 timeline: Timeline,
116 ) -> Result<&mut Arc<dyn TimestampOracle<Timestamp> + Send + Sync>, AdapterError> {
117 if !self.oracles.contains_key(&timeline) {
118 let oracle = self
119 .call_coordinator(|tx| Command::GetOracle {
120 timeline: timeline.clone(),
121 tx,
122 })
123 .await?;
124 self.oracles.insert(timeline.clone(), oracle);
125 }
126 Ok(self.oracles.get_mut(&timeline).expect("ensured above"))
127 }
128
129 pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
132 let start = std::time::Instant::now();
133 let CatalogSnapshot { catalog } = self
134 .call_coordinator(|tx| Command::CatalogSnapshot { tx })
135 .await;
136 self.coordinator_client
137 .metrics()
138 .catalog_snapshot_seconds
139 .with_label_values(&[context])
140 .observe(start.elapsed().as_secs_f64());
141 catalog
142 }
143
144 pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
145 where
146 F: FnOnce(oneshot::Sender<T>) -> Command,
147 {
148 let (tx, rx) = oneshot::channel();
149 self.coordinator_client.send(f(tx));
150 rx.await
151 .expect("if the coordinator is still alive, it shouldn't have dropped our call")
152 }
153
154 pub async fn acquire_read_holds_and_least_valid_write(
167 &mut self,
168 id_bundle: &CollectionIdBundle,
169 ) -> Result<(ReadHolds<Timestamp>, Antichain<Timestamp>), CollectionLookupError> {
170 let mut read_holds = ReadHolds::new();
171 let mut upper = Antichain::new();
172
173 if !id_bundle.storage_ids.is_empty() {
174 let desired_storage: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
175 let storage_read_holds = self
176 .storage_collections
177 .acquire_read_holds(desired_storage)?;
178 read_holds.storage_holds = storage_read_holds
179 .into_iter()
180 .map(|hold| (hold.id(), hold))
181 .collect();
182
183 let storage_ids: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
184 for f in self
185 .storage_collections
186 .collections_frontiers(storage_ids)?
187 {
188 upper.extend(f.write_frontier);
189 }
190 }
191
192 for (&instance_id, collection_ids) in &id_bundle.compute_ids {
193 let client = self.ensure_compute_instance_client(instance_id).await?;
194
195 for (id, read_hold, write_frontier) in client
196 .acquire_read_holds_and_collection_write_frontiers(
197 collection_ids.iter().copied().collect(),
198 )
199 .await?
200 {
201 let prev = read_holds
202 .compute_holds
203 .insert((instance_id, id), read_hold);
204 assert!(
205 prev.is_none(),
206 "duplicate compute ID in id_bundle {id_bundle:?}"
207 );
208
209 upper.extend(write_frontier);
210 }
211 }
212
213 Ok((read_holds, upper))
214 }
215
216 pub async fn implement_fast_path_peek_plan(
225 &mut self,
226 fast_path: FastPathPlan,
227 timestamp: Timestamp,
228 finishing: mz_expr::RowSetFinishing,
229 compute_instance: ComputeInstanceId,
230 target_replica: Option<mz_cluster_client::ReplicaId>,
231 intermediate_result_type: mz_repr::SqlRelationType,
232 max_result_size: u64,
233 max_returned_query_size: Option<u64>,
234 row_set_finishing_seconds: Histogram,
235 input_read_holds: ReadHolds<Timestamp>,
236 peek_stash_read_batch_size_bytes: usize,
237 peek_stash_read_memory_budget_bytes: usize,
238 conn_id: mz_adapter_types::connection::ConnectionId,
239 depends_on: std::collections::BTreeSet<mz_repr::GlobalId>,
240 watch_set: Option<WatchSetCreation>,
241 ) -> Result<crate::ExecuteResponse, AdapterError> {
242 if let FastPathPlan::Constant(rows_res, _) = fast_path {
244 if let Some(ref ws) = watch_set {
247 self.log_lifecycle_event(
248 ws.logging_id,
249 statement_logging::StatementLifecycleEvent::StorageDependenciesFinished,
250 );
251 self.log_lifecycle_event(
252 ws.logging_id,
253 statement_logging::StatementLifecycleEvent::ComputeDependenciesFinished,
254 );
255 }
256
257 let mut rows = match rows_res {
258 Ok(rows) => rows,
259 Err(e) => return Err(e.into()),
260 };
261 consolidate(&mut rows);
262
263 let mut results = Vec::new();
264 for (row, count) in rows {
265 let count = match u64::try_from(count.into_inner()) {
266 Ok(u) => usize::cast_from(u),
267 Err(_) => {
268 return Err(AdapterError::Unstructured(anyhow::anyhow!(
269 "Negative multiplicity in constant result: {}",
270 count
271 )));
272 }
273 };
274 match std::num::NonZeroUsize::new(count) {
275 Some(nzu) => {
276 results.push((row, nzu));
277 }
278 None => {
279 }
281 };
282 }
283 let row_collection = RowCollection::new(results, &finishing.order_by);
284 return match finishing.finish(
285 row_collection,
286 max_result_size,
287 max_returned_query_size,
288 &row_set_finishing_seconds,
289 ) {
290 Ok((rows, _bytes)) => Ok(Coordinator::send_immediate_rows(rows)),
291 Err(e) => Err(AdapterError::ResultSize(e)),
293 };
294 }
295
296 let (peek_target, target_read_hold, literal_constraints, mfp, strategy) = match fast_path {
297 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, mfp) => {
298 let peek_target = PeekTarget::Index { id: idx_id };
299 let target_read_hold = input_read_holds
300 .compute_holds
301 .get(&(compute_instance, idx_id))
302 .expect("missing compute read hold on PeekExisting peek target")
303 .clone();
304 let strategy = statement_logging::StatementExecutionStrategy::FastPath;
305 (
306 peek_target,
307 target_read_hold,
308 literal_constraints,
309 mfp,
310 strategy,
311 )
312 }
313 FastPathPlan::PeekPersist(coll_id, literal_constraint, mfp) => {
314 let literal_constraints = literal_constraint.map(|r| vec![r]);
315 let metadata = self
316 .storage_collections
317 .collection_metadata(coll_id)
318 .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
319 .clone();
320 let peek_target = PeekTarget::Persist {
321 id: coll_id,
322 metadata,
323 };
324 let target_read_hold = input_read_holds
325 .storage_holds
326 .get(&coll_id)
327 .expect("missing storage read hold on PeekPersist peek target")
328 .clone();
329 let strategy = statement_logging::StatementExecutionStrategy::PersistFastPath;
330 (
331 peek_target,
332 target_read_hold,
333 literal_constraints,
334 mfp,
335 strategy,
336 )
337 }
338 _ => {
339 unreachable!()
341 }
342 };
343
344 let (rows_tx, rows_rx) = oneshot::channel();
345 let uuid = Uuid::new_v4();
346
347 let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
350 let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols);
351
352 let client = self
353 .ensure_compute_instance_client(compute_instance)
354 .await
355 .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?;
356
357 self.call_coordinator(|tx| Command::RegisterFrontendPeek {
362 uuid,
363 conn_id: conn_id.clone(),
364 cluster_id: compute_instance,
365 depends_on,
366 is_fast_path: true,
367 watch_set,
368 tx,
369 })
370 .await?;
371
372 let finishing_for_instance = finishing.clone();
373 let peek_result = client
374 .peek(
375 peek_target,
376 literal_constraints,
377 uuid,
378 timestamp,
379 result_desc,
380 finishing_for_instance,
381 mfp,
382 target_read_hold,
383 target_replica,
384 rows_tx,
385 )
386 .await;
387
388 if let Err(err) = peek_result {
389 self.call_coordinator(|tx| Command::UnregisterFrontendPeek { uuid, tx })
392 .await;
393 return Err(
394 AdapterError::concurrent_dependency_drop_from_instance_peek_error(
395 err,
396 compute_instance,
397 ),
398 );
399 }
400
401 let peek_response_stream = Coordinator::create_peek_response_stream(
402 rows_rx,
403 finishing,
404 max_result_size,
405 max_returned_query_size,
406 row_set_finishing_seconds,
407 self.persist_client.clone(),
408 peek_stash_read_batch_size_bytes,
409 peek_stash_read_memory_budget_bytes,
410 );
411
412 Ok(crate::ExecuteResponse::SendingRowsStreaming {
413 rows: Box::pin(peek_response_stream),
414 instance_id: compute_instance,
415 strategy,
416 })
417 }
418
419 pub(crate) fn log_began_execution(
423 &self,
424 record: statement_logging::StatementBeganExecutionRecord,
425 mseh_update: Row,
426 prepared_statement: Option<PreparedStatementEvent>,
427 ) {
428 self.coordinator_client
429 .send(Command::FrontendStatementLogging(
430 FrontendStatementLoggingEvent::BeganExecution {
431 record,
432 mseh_update,
433 prepared_statement,
434 },
435 ));
436 }
437
438 pub(crate) fn log_set_cluster(
440 &self,
441 id: StatementLoggingId,
442 cluster_id: mz_controller_types::ClusterId,
443 ) {
444 self.coordinator_client
445 .send(Command::FrontendStatementLogging(
446 FrontendStatementLoggingEvent::SetCluster { id, cluster_id },
447 ));
448 }
449
450 pub(crate) fn log_set_timestamp(&self, id: StatementLoggingId, timestamp: mz_repr::Timestamp) {
452 self.coordinator_client
453 .send(Command::FrontendStatementLogging(
454 FrontendStatementLoggingEvent::SetTimestamp { id, timestamp },
455 ));
456 }
457
458 pub(crate) fn log_set_transient_index_id(
460 &self,
461 id: StatementLoggingId,
462 transient_index_id: mz_repr::GlobalId,
463 ) {
464 self.coordinator_client
465 .send(Command::FrontendStatementLogging(
466 FrontendStatementLoggingEvent::SetTransientIndex {
467 id,
468 transient_index_id,
469 },
470 ));
471 }
472
473 pub(crate) fn log_lifecycle_event(
475 &self,
476 id: StatementLoggingId,
477 event: statement_logging::StatementLifecycleEvent,
478 ) {
479 let when = (self.statement_logging_frontend.now)();
480 self.coordinator_client
481 .send(Command::FrontendStatementLogging(
482 FrontendStatementLoggingEvent::Lifecycle { id, event, when },
483 ));
484 }
485
486 pub(crate) fn log_ended_execution(
488 &self,
489 id: StatementLoggingId,
490 reason: statement_logging::StatementEndedExecutionReason,
491 ) {
492 let ended_at = (self.statement_logging_frontend.now)();
493 let record = statement_logging::StatementEndedExecutionRecord {
494 id: id.0,
495 reason,
496 ended_at,
497 };
498 self.coordinator_client
499 .send(Command::FrontendStatementLogging(
500 FrontendStatementLoggingEvent::EndedExecution(record),
501 ));
502 }
503}