1use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use differential_dataflow::consolidation::consolidate;
14use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
15use mz_compute_client::protocol::command::PeekTarget;
16use mz_compute_types::ComputeInstanceId;
17use mz_expr::row::RowCollection;
18use mz_ore::cast::CastFrom;
19use mz_persist_client::PersistClient;
20use mz_repr::RelationDesc;
21use mz_repr::Timestamp;
22use mz_repr::global_id::TransientIdGen;
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::TimestampOracle;
26use prometheus::Histogram;
27use timely::progress::Antichain;
28use tokio::sync::oneshot;
29use uuid::Uuid;
30
31use crate::catalog::Catalog;
32use crate::command::{CatalogSnapshot, Command};
33use crate::coord::Coordinator;
34use crate::coord::peek::FastPathPlan;
35use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging};
36
37pub type StorageCollectionsHandle = Arc<
39 dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = Timestamp>
40 + Send
41 + Sync,
42>;
43
44#[derive(Debug)]
46pub struct PeekClient {
47 coordinator_client: Client,
48 compute_instances:
53 BTreeMap<ComputeInstanceId, mz_compute_client::controller::instance::Client<Timestamp>>,
54 pub storage_collections: StorageCollectionsHandle,
56 pub transient_id_gen: Arc<TransientIdGen>,
58 pub optimizer_metrics: OptimizerMetrics,
59 oracles: BTreeMap<Timeline, Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
61 persist_client: PersistClient,
62}
63
64impl PeekClient {
65 pub fn new(
67 coordinator_client: Client,
68 storage_collections: StorageCollectionsHandle,
69 transient_id_gen: Arc<TransientIdGen>,
70 optimizer_metrics: OptimizerMetrics,
71 persist_client: PersistClient,
72 ) -> Self {
73 Self {
74 coordinator_client,
75 compute_instances: Default::default(), storage_collections,
77 transient_id_gen,
78 optimizer_metrics,
79 oracles: Default::default(), persist_client,
81 }
82 }
83
84 pub async fn ensure_compute_instance_client(
85 &mut self,
86 compute_instance: ComputeInstanceId,
87 ) -> Result<&mut mz_compute_client::controller::instance::Client<Timestamp>, InstanceMissing>
88 {
89 if !self.compute_instances.contains_key(&compute_instance) {
90 let client = self
91 .call_coordinator(|tx| Command::GetComputeInstanceClient {
92 instance_id: compute_instance,
93 tx,
94 })
95 .await?;
96 self.compute_instances.insert(compute_instance, client);
97 }
98 Ok(self
99 .compute_instances
100 .get_mut(&compute_instance)
101 .expect("ensured above"))
102 }
103
104 pub async fn ensure_oracle(
105 &mut self,
106 timeline: Timeline,
107 ) -> Result<&mut Arc<dyn TimestampOracle<Timestamp> + Send + Sync>, AdapterError> {
108 if !self.oracles.contains_key(&timeline) {
109 let oracle = self
110 .call_coordinator(|tx| Command::GetOracle {
111 timeline: timeline.clone(),
112 tx,
113 })
114 .await?;
115 self.oracles.insert(timeline.clone(), oracle);
116 }
117 Ok(self.oracles.get_mut(&timeline).expect("ensured above"))
118 }
119
120 pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
123 let start = std::time::Instant::now();
124 let CatalogSnapshot { catalog } = self
125 .call_coordinator(|tx| Command::CatalogSnapshot { tx })
126 .await;
127 self.coordinator_client
128 .metrics()
129 .catalog_snapshot_seconds
130 .with_label_values(&[context])
131 .observe(start.elapsed().as_secs_f64());
132 catalog
133 }
134
135 pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
136 where
137 F: FnOnce(oneshot::Sender<T>) -> Command,
138 {
139 let (tx, rx) = oneshot::channel();
140 self.coordinator_client.send(f(tx));
141 rx.await
142 .expect("if the coordinator is still alive, it shouldn't have dropped our call")
143 }
144
145 pub async fn acquire_read_holds_and_least_valid_write(
158 &mut self,
159 id_bundle: &CollectionIdBundle,
160 ) -> Result<(ReadHolds<Timestamp>, Antichain<Timestamp>), CollectionLookupError> {
161 let mut read_holds = ReadHolds::new();
162 let mut upper = Antichain::new();
163
164 if !id_bundle.storage_ids.is_empty() {
165 let desired_storage: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
166 let storage_read_holds = self
167 .storage_collections
168 .acquire_read_holds(desired_storage)?;
169 read_holds.storage_holds = storage_read_holds
170 .into_iter()
171 .map(|hold| (hold.id(), hold))
172 .collect();
173
174 let storage_ids: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
175 for f in self
176 .storage_collections
177 .collections_frontiers(storage_ids)?
178 {
179 upper.extend(f.write_frontier);
180 }
181 }
182
183 for (&instance_id, collection_ids) in &id_bundle.compute_ids {
184 let client = self.ensure_compute_instance_client(instance_id).await?;
185
186 for (id, read_hold, write_frontier) in client
187 .acquire_read_holds_and_collection_write_frontiers(
188 collection_ids.iter().copied().collect(),
189 )
190 .await?
191 {
192 let prev = read_holds
193 .compute_holds
194 .insert((instance_id, id), read_hold);
195 assert!(
196 prev.is_none(),
197 "duplicate compute ID in id_bundle {id_bundle:?}"
198 );
199
200 upper.extend(write_frontier);
201 }
202 }
203
204 Ok((read_holds, upper))
205 }
206
207 pub async fn implement_fast_path_peek_plan(
220 &mut self,
221 fast_path: FastPathPlan,
222 timestamp: Timestamp,
223 finishing: mz_expr::RowSetFinishing,
224 compute_instance: ComputeInstanceId,
225 target_replica: Option<mz_cluster_client::ReplicaId>,
226 intermediate_result_type: mz_repr::SqlRelationType,
227 max_result_size: u64,
228 max_returned_query_size: Option<u64>,
229 row_set_finishing_seconds: Histogram,
230 input_read_holds: ReadHolds<Timestamp>,
231 peek_stash_read_batch_size_bytes: usize,
232 peek_stash_read_memory_budget_bytes: usize,
233 ) -> Result<crate::ExecuteResponse, AdapterError> {
234 if let FastPathPlan::Constant(rows_res, _) = fast_path {
236 let mut rows = match rows_res {
237 Ok(rows) => rows,
238 Err(e) => return Err(e.into()),
239 };
240 consolidate(&mut rows);
241
242 let mut results = Vec::new();
243 for (row, count) in rows {
244 let count = match u64::try_from(count.into_inner()) {
245 Ok(u) => usize::cast_from(u),
246 Err(_) => {
247 return Err(AdapterError::Unstructured(anyhow::anyhow!(
248 "Negative multiplicity in constant result: {}",
249 count
250 )));
251 }
252 };
253 match std::num::NonZeroUsize::new(count) {
254 Some(nzu) => {
255 results.push((row, nzu));
256 }
257 None => {
258 }
260 };
261 }
262 let row_collection = RowCollection::new(results, &finishing.order_by);
263 return match finishing.finish(
264 row_collection,
265 max_result_size,
266 max_returned_query_size,
267 &row_set_finishing_seconds,
268 ) {
269 Ok((rows, _bytes)) => Ok(Coordinator::send_immediate_rows(rows)),
270 Err(e) => Err(AdapterError::ResultSize(e)),
272 };
273 }
274
275 let (peek_target, target_read_hold, literal_constraints, mfp, strategy) = match fast_path {
276 FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, mfp) => {
277 let peek_target = PeekTarget::Index { id: idx_id };
278 let target_read_hold = input_read_holds
279 .compute_holds
280 .get(&(compute_instance, idx_id))
281 .expect("missing compute read hold on PeekExisting peek target")
282 .clone();
283 let strategy = statement_logging::StatementExecutionStrategy::FastPath;
284 (
285 peek_target,
286 target_read_hold,
287 literal_constraints,
288 mfp,
289 strategy,
290 )
291 }
292 FastPathPlan::PeekPersist(coll_id, literal_constraint, mfp) => {
293 let literal_constraints = literal_constraint.map(|r| vec![r]);
294 let metadata = self
295 .storage_collections
296 .collection_metadata(coll_id)
297 .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
298 .clone();
299 let peek_target = PeekTarget::Persist {
300 id: coll_id,
301 metadata,
302 };
303 let target_read_hold = input_read_holds
304 .storage_holds
305 .get(&coll_id)
306 .expect("missing storage read hold on PeekPersist peek target")
307 .clone();
308 let strategy = statement_logging::StatementExecutionStrategy::PersistFastPath;
309 (
310 peek_target,
311 target_read_hold,
312 literal_constraints,
313 mfp,
314 strategy,
315 )
316 }
317 _ => {
318 unreachable!()
320 }
321 };
322
323 let (rows_tx, rows_rx) = oneshot::channel();
324 let uuid = Uuid::new_v4();
325
326 let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
329 let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols);
330
331 let client = self
333 .ensure_compute_instance_client(compute_instance)
334 .await
335 .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?;
336 let finishing_for_instance = finishing.clone();
337 client
338 .peek(
339 peek_target,
340 literal_constraints,
341 uuid,
342 timestamp,
343 result_desc,
344 finishing_for_instance,
345 mfp,
346 target_read_hold,
347 target_replica,
348 rows_tx,
349 )
350 .await
351 .map_err(|err| {
352 AdapterError::concurrent_dependency_drop_from_peek_error(err, compute_instance)
353 })?;
354
355 let peek_response_stream = Coordinator::create_peek_response_stream(
356 rows_rx,
357 finishing,
358 max_result_size,
359 max_returned_query_size,
360 row_set_finishing_seconds,
361 self.persist_client.clone(),
362 peek_stash_read_batch_size_bytes,
363 peek_stash_read_memory_budget_bytes,
364 );
365 Ok(crate::ExecuteResponse::SendingRowsStreaming {
366 rows: Box::pin(peek_response_stream),
367 instance_id: compute_instance,
368 strategy,
369 })
370 }
371}