mz_adapter/
peek_client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use differential_dataflow::consolidation::consolidate;
14use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
15use mz_compute_client::protocol::command::PeekTarget;
16use mz_compute_types::ComputeInstanceId;
17use mz_expr::row::RowCollection;
18use mz_ore::cast::CastFrom;
19use mz_persist_client::PersistClient;
20use mz_repr::RelationDesc;
21use mz_repr::Timestamp;
22use mz_repr::global_id::TransientIdGen;
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::TimestampOracle;
26use prometheus::Histogram;
27use timely::progress::Antichain;
28use tokio::sync::oneshot;
29use uuid::Uuid;
30
31use crate::catalog::Catalog;
32use crate::command::{CatalogSnapshot, Command};
33use crate::coord::Coordinator;
34use crate::coord::peek::FastPathPlan;
35use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging};
36
37/// Storage collections trait alias we need to consult for since/frontiers.
38pub type StorageCollectionsHandle = Arc<
39    dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = Timestamp>
40        + Send
41        + Sync,
42>;
43
44/// Clients needed for peek sequencing in the Adapter Frontend.
45#[derive(Debug)]
46pub struct PeekClient {
47    coordinator_client: Client,
48    /// Channels to talk to each compute Instance task directly. Lazily populated.
49    /// Note that these are never cleaned up. In theory, this could lead to a very slow memory leak
50    /// if a long-running user session keeps peeking on clusters that are being created and dropped
51    /// in a hot loop. Hopefully this won't occur any time soon.
52    compute_instances:
53        BTreeMap<ComputeInstanceId, mz_compute_client::controller::instance::Client<Timestamp>>,
54    /// Handle to storage collections for reading frontiers and policies.
55    pub storage_collections: StorageCollectionsHandle,
56    /// A generator for transient `GlobalId`s, shared with Coordinator.
57    pub transient_id_gen: Arc<TransientIdGen>,
58    pub optimizer_metrics: OptimizerMetrics,
59    /// Per-timeline oracles from the coordinator. Lazily populated.
60    oracles: BTreeMap<Timeline, Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
61    persist_client: PersistClient,
62}
63
64impl PeekClient {
65    /// Creates a PeekClient.
66    pub fn new(
67        coordinator_client: Client,
68        storage_collections: StorageCollectionsHandle,
69        transient_id_gen: Arc<TransientIdGen>,
70        optimizer_metrics: OptimizerMetrics,
71        persist_client: PersistClient,
72    ) -> Self {
73        Self {
74            coordinator_client,
75            compute_instances: Default::default(), // lazily populated
76            storage_collections,
77            transient_id_gen,
78            optimizer_metrics,
79            oracles: Default::default(), // lazily populated
80            persist_client,
81        }
82    }
83
84    pub async fn ensure_compute_instance_client(
85        &mut self,
86        compute_instance: ComputeInstanceId,
87    ) -> Result<&mut mz_compute_client::controller::instance::Client<Timestamp>, InstanceMissing>
88    {
89        if !self.compute_instances.contains_key(&compute_instance) {
90            let client = self
91                .call_coordinator(|tx| Command::GetComputeInstanceClient {
92                    instance_id: compute_instance,
93                    tx,
94                })
95                .await?;
96            self.compute_instances.insert(compute_instance, client);
97        }
98        Ok(self
99            .compute_instances
100            .get_mut(&compute_instance)
101            .expect("ensured above"))
102    }
103
104    pub async fn ensure_oracle(
105        &mut self,
106        timeline: Timeline,
107    ) -> Result<&mut Arc<dyn TimestampOracle<Timestamp> + Send + Sync>, AdapterError> {
108        if !self.oracles.contains_key(&timeline) {
109            let oracle = self
110                .call_coordinator(|tx| Command::GetOracle {
111                    timeline: timeline.clone(),
112                    tx,
113                })
114                .await?;
115            self.oracles.insert(timeline.clone(), oracle);
116        }
117        Ok(self.oracles.get_mut(&timeline).expect("ensured above"))
118    }
119
120    /// Fetch a snapshot of the catalog for use in frontend peek sequencing.
121    /// Records the time taken in the adapter metrics, labeled by `context`.
122    pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
123        let start = std::time::Instant::now();
124        let CatalogSnapshot { catalog } = self
125            .call_coordinator(|tx| Command::CatalogSnapshot { tx })
126            .await;
127        self.coordinator_client
128            .metrics()
129            .catalog_snapshot_seconds
130            .with_label_values(&[context])
131            .observe(start.elapsed().as_secs_f64());
132        catalog
133    }
134
135    pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
136    where
137        F: FnOnce(oneshot::Sender<T>) -> Command,
138    {
139        let (tx, rx) = oneshot::channel();
140        self.coordinator_client.send(f(tx));
141        rx.await
142            .expect("if the coordinator is still alive, it shouldn't have dropped our call")
143    }
144
145    /// Acquire read holds on the given compute/storage collections, and
146    /// determine the smallest common valid write frontier among the specified collections.
147    ///
148    /// Similar to `Coordinator::acquire_read_holds` and `TimestampProvider::least_valid_write`
149    /// combined.
150    ///
151    /// Note: Unlike the Coordinator/StorageController's `least_valid_write` that treats sinks
152    /// specially when fetching storage frontiers (see `mz_storage_controller::collections_frontiers`),
153    /// we intentionally do not special‑case sinks here because peeks never read from sinks.
154    /// Therefore, using `StorageCollections::collections_frontiers` is sufficient.
155    ///
156    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
157    pub async fn acquire_read_holds_and_least_valid_write(
158        &mut self,
159        id_bundle: &CollectionIdBundle,
160    ) -> Result<(ReadHolds<Timestamp>, Antichain<Timestamp>), CollectionLookupError> {
161        let mut read_holds = ReadHolds::new();
162        let mut upper = Antichain::new();
163
164        if !id_bundle.storage_ids.is_empty() {
165            let desired_storage: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
166            let storage_read_holds = self
167                .storage_collections
168                .acquire_read_holds(desired_storage)?;
169            read_holds.storage_holds = storage_read_holds
170                .into_iter()
171                .map(|hold| (hold.id(), hold))
172                .collect();
173
174            let storage_ids: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
175            for f in self
176                .storage_collections
177                .collections_frontiers(storage_ids)?
178            {
179                upper.extend(f.write_frontier);
180            }
181        }
182
183        for (&instance_id, collection_ids) in &id_bundle.compute_ids {
184            let client = self.ensure_compute_instance_client(instance_id).await?;
185
186            for (id, read_hold, write_frontier) in client
187                .acquire_read_holds_and_collection_write_frontiers(
188                    collection_ids.iter().copied().collect(),
189                )
190                .await?
191            {
192                let prev = read_holds
193                    .compute_holds
194                    .insert((instance_id, id), read_hold);
195                assert!(
196                    prev.is_none(),
197                    "duplicate compute ID in id_bundle {id_bundle:?}"
198                );
199
200                upper.extend(write_frontier);
201            }
202        }
203
204        Ok((read_holds, upper))
205    }
206
207    /// Implement a fast-path peek plan.
208    /// This is similar to `Coordinator::implement_peek_plan`, but only for fast path peeks.
209    ///
210    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
211    ///
212    /// Note: `input_read_holds` has holds for all inputs. For fast-path peeks, this includes the
213    /// peek target. For slow-path peeks (to be implemented later), we'll need to additionally call
214    /// into the Controller to acquire a hold on the peek target after we create the dataflow.
215    ///
216    /// TODO(peek-seq): add statement logging
217    /// TODO(peek-seq): cancellation (see pending_peeks/client_pending_peeks wiring in the old
218    /// sequencing)
219    pub async fn implement_fast_path_peek_plan(
220        &mut self,
221        fast_path: FastPathPlan,
222        timestamp: Timestamp,
223        finishing: mz_expr::RowSetFinishing,
224        compute_instance: ComputeInstanceId,
225        target_replica: Option<mz_cluster_client::ReplicaId>,
226        intermediate_result_type: mz_repr::SqlRelationType,
227        max_result_size: u64,
228        max_returned_query_size: Option<u64>,
229        row_set_finishing_seconds: Histogram,
230        input_read_holds: ReadHolds<Timestamp>,
231        peek_stash_read_batch_size_bytes: usize,
232        peek_stash_read_memory_budget_bytes: usize,
233    ) -> Result<crate::ExecuteResponse, AdapterError> {
234        // If the dataflow optimizes to a constant expression, we can immediately return the result.
235        if let FastPathPlan::Constant(rows_res, _) = fast_path {
236            let mut rows = match rows_res {
237                Ok(rows) => rows,
238                Err(e) => return Err(e.into()),
239            };
240            consolidate(&mut rows);
241
242            let mut results = Vec::new();
243            for (row, count) in rows {
244                let count = match u64::try_from(count.into_inner()) {
245                    Ok(u) => usize::cast_from(u),
246                    Err(_) => {
247                        return Err(AdapterError::Unstructured(anyhow::anyhow!(
248                            "Negative multiplicity in constant result: {}",
249                            count
250                        )));
251                    }
252                };
253                match std::num::NonZeroUsize::new(count) {
254                    Some(nzu) => {
255                        results.push((row, nzu));
256                    }
257                    None => {
258                        // No need to retain 0 diffs.
259                    }
260                };
261            }
262            let row_collection = RowCollection::new(results, &finishing.order_by);
263            return match finishing.finish(
264                row_collection,
265                max_result_size,
266                max_returned_query_size,
267                &row_set_finishing_seconds,
268            ) {
269                Ok((rows, _bytes)) => Ok(Coordinator::send_immediate_rows(rows)),
270                // TODO(peek-seq): make this a structured error. (also in the old sequencing)
271                Err(e) => Err(AdapterError::ResultSize(e)),
272            };
273        }
274
275        let (peek_target, target_read_hold, literal_constraints, mfp, strategy) = match fast_path {
276            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, mfp) => {
277                let peek_target = PeekTarget::Index { id: idx_id };
278                let target_read_hold = input_read_holds
279                    .compute_holds
280                    .get(&(compute_instance, idx_id))
281                    .expect("missing compute read hold on PeekExisting peek target")
282                    .clone();
283                let strategy = statement_logging::StatementExecutionStrategy::FastPath;
284                (
285                    peek_target,
286                    target_read_hold,
287                    literal_constraints,
288                    mfp,
289                    strategy,
290                )
291            }
292            FastPathPlan::PeekPersist(coll_id, literal_constraint, mfp) => {
293                let literal_constraints = literal_constraint.map(|r| vec![r]);
294                let metadata = self
295                    .storage_collections
296                    .collection_metadata(coll_id)
297                    .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
298                    .clone();
299                let peek_target = PeekTarget::Persist {
300                    id: coll_id,
301                    metadata,
302                };
303                let target_read_hold = input_read_holds
304                    .storage_holds
305                    .get(&coll_id)
306                    .expect("missing storage read hold on PeekPersist peek target")
307                    .clone();
308                let strategy = statement_logging::StatementExecutionStrategy::PersistFastPath;
309                (
310                    peek_target,
311                    target_read_hold,
312                    literal_constraints,
313                    mfp,
314                    strategy,
315                )
316            }
317            _ => {
318                // FastPathPlan::Constant handled above.
319                unreachable!()
320            }
321        };
322
323        let (rows_tx, rows_rx) = oneshot::channel();
324        let uuid = Uuid::new_v4();
325
326        // At this stage we don't know column names for the result because we
327        // only know the peek's result type as a bare SqlRelationType.
328        let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
329        let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols);
330
331        // Issue the peek to the instance
332        let client = self
333            .ensure_compute_instance_client(compute_instance)
334            .await
335            .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?;
336        let finishing_for_instance = finishing.clone();
337        client
338            .peek(
339                peek_target,
340                literal_constraints,
341                uuid,
342                timestamp,
343                result_desc,
344                finishing_for_instance,
345                mfp,
346                target_read_hold,
347                target_replica,
348                rows_tx,
349            )
350            .await
351            .map_err(|err| {
352                AdapterError::concurrent_dependency_drop_from_peek_error(err, compute_instance)
353            })?;
354
355        let peek_response_stream = Coordinator::create_peek_response_stream(
356            rows_rx,
357            finishing,
358            max_result_size,
359            max_returned_query_size,
360            row_set_finishing_seconds,
361            self.persist_client.clone(),
362            peek_stash_read_batch_size_bytes,
363            peek_stash_read_memory_budget_bytes,
364        );
365        Ok(crate::ExecuteResponse::SendingRowsStreaming {
366            rows: Box::pin(peek_response_stream),
367            instance_id: compute_instance,
368            strategy,
369        })
370    }
371}