Skip to main content

mz_adapter/
peek_client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use differential_dataflow::consolidation::consolidate;
14use mz_compute_client::controller::error::{CollectionMissing, InstanceMissing};
15use mz_compute_client::controller::instance_client::InstanceClient;
16use mz_compute_client::controller::instance_client::{AcquireReadHoldsError, InstanceShutDown};
17use mz_compute_client::protocol::command::PeekTarget;
18use mz_compute_types::ComputeInstanceId;
19use mz_expr::row::RowCollection;
20use mz_ore::cast::CastFrom;
21use mz_persist_client::PersistClient;
22use mz_repr::GlobalId;
23use mz_repr::Timestamp;
24use mz_repr::global_id::TransientIdGen;
25use mz_repr::{RelationDesc, Row};
26use mz_sql::optimizer_metrics::OptimizerMetrics;
27use mz_storage_types::sources::Timeline;
28use mz_timestamp_oracle::TimestampOracle;
29use prometheus::Histogram;
30use thiserror::Error;
31use timely::progress::Antichain;
32use tokio::sync::oneshot;
33use uuid::Uuid;
34
35use crate::catalog::Catalog;
36use crate::command::{CatalogSnapshot, Command};
37use crate::coord::Coordinator;
38use crate::coord::peek::FastPathPlan;
39use crate::statement_logging::WatchSetCreation;
40use crate::statement_logging::{
41    FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend,
42    StatementLoggingId,
43};
44use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging};
45
46/// Storage collections trait alias we need to consult for since/frontiers.
47pub type StorageCollectionsHandle = Arc<
48    dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = Timestamp>
49        + Send
50        + Sync,
51>;
52
53/// Clients needed for peek sequencing in the Adapter Frontend.
54#[derive(Debug)]
55pub struct PeekClient {
56    coordinator_client: Client,
57    /// Channels to talk to each compute Instance task directly. Lazily populated.
58    /// Note that these are never cleaned up. In theory, this could lead to a very slow memory leak
59    /// if a long-running user session keeps peeking on clusters that are being created and dropped
60    /// in a hot loop. Hopefully this won't occur any time soon.
61    compute_instances: BTreeMap<ComputeInstanceId, InstanceClient<Timestamp>>,
62    /// Handle to storage collections for reading frontiers and policies.
63    pub storage_collections: StorageCollectionsHandle,
64    /// A generator for transient `GlobalId`s, shared with Coordinator.
65    pub transient_id_gen: Arc<TransientIdGen>,
66    pub optimizer_metrics: OptimizerMetrics,
67    /// Per-timeline oracles from the coordinator. Lazily populated.
68    oracles: BTreeMap<Timeline, Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
69    persist_client: PersistClient,
70    /// Statement logging state for frontend peek sequencing.
71    pub statement_logging_frontend: StatementLoggingFrontend,
72}
73
74impl PeekClient {
75    /// Creates a PeekClient.
76    pub fn new(
77        coordinator_client: Client,
78        storage_collections: StorageCollectionsHandle,
79        transient_id_gen: Arc<TransientIdGen>,
80        optimizer_metrics: OptimizerMetrics,
81        persist_client: PersistClient,
82        statement_logging_frontend: StatementLoggingFrontend,
83    ) -> Self {
84        Self {
85            coordinator_client,
86            compute_instances: Default::default(), // lazily populated
87            storage_collections,
88            transient_id_gen,
89            optimizer_metrics,
90            statement_logging_frontend,
91            oracles: Default::default(), // lazily populated
92            persist_client,
93        }
94    }
95
96    pub async fn ensure_compute_instance_client(
97        &mut self,
98        compute_instance: ComputeInstanceId,
99    ) -> Result<InstanceClient<Timestamp>, InstanceMissing> {
100        if !self.compute_instances.contains_key(&compute_instance) {
101            let client = self
102                .call_coordinator(|tx| Command::GetComputeInstanceClient {
103                    instance_id: compute_instance,
104                    tx,
105                })
106                .await?;
107            self.compute_instances.insert(compute_instance, client);
108        }
109        Ok(self
110            .compute_instances
111            .get(&compute_instance)
112            .expect("ensured above")
113            .clone())
114    }
115
116    pub async fn ensure_oracle(
117        &mut self,
118        timeline: Timeline,
119    ) -> Result<&mut Arc<dyn TimestampOracle<Timestamp> + Send + Sync>, AdapterError> {
120        if !self.oracles.contains_key(&timeline) {
121            let oracle = self
122                .call_coordinator(|tx| Command::GetOracle {
123                    timeline: timeline.clone(),
124                    tx,
125                })
126                .await?;
127            self.oracles.insert(timeline.clone(), oracle);
128        }
129        Ok(self.oracles.get_mut(&timeline).expect("ensured above"))
130    }
131
132    /// Fetch a snapshot of the catalog for use in frontend peek sequencing.
133    /// Records the time taken in the adapter metrics, labeled by `context`.
134    pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
135        let start = std::time::Instant::now();
136        let CatalogSnapshot { catalog } = self
137            .call_coordinator(|tx| Command::CatalogSnapshot { tx })
138            .await;
139        self.coordinator_client
140            .metrics()
141            .catalog_snapshot_seconds
142            .with_label_values(&[context])
143            .observe(start.elapsed().as_secs_f64());
144        catalog
145    }
146
147    pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
148    where
149        F: FnOnce(oneshot::Sender<T>) -> Command,
150    {
151        let (tx, rx) = oneshot::channel();
152        self.coordinator_client.send(f(tx));
153        rx.await
154            .expect("if the coordinator is still alive, it shouldn't have dropped our call")
155    }
156
157    /// Acquire read holds on the given compute/storage collections, and
158    /// determine the smallest common valid write frontier among the specified collections.
159    ///
160    /// Similar to `Coordinator::acquire_read_holds` and `TimestampProvider::least_valid_write`
161    /// combined.
162    ///
163    /// Note: Unlike the Coordinator/StorageController's `least_valid_write` that treats sinks
164    /// specially when fetching storage frontiers (see `mz_storage_controller::collections_frontiers`),
165    /// we intentionally do not special‑case sinks here because peeks never read from sinks.
166    /// Therefore, using `StorageCollections::collections_frontiers` is sufficient.
167    ///
168    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
169    pub async fn acquire_read_holds_and_least_valid_write(
170        &mut self,
171        id_bundle: &CollectionIdBundle,
172    ) -> Result<(ReadHolds<Timestamp>, Antichain<Timestamp>), CollectionLookupError> {
173        let mut read_holds = ReadHolds::new();
174        let mut upper = Antichain::new();
175
176        if !id_bundle.storage_ids.is_empty() {
177            let desired_storage: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
178            let storage_read_holds = self
179                .storage_collections
180                .acquire_read_holds(desired_storage)?;
181            read_holds.storage_holds = storage_read_holds
182                .into_iter()
183                .map(|hold| (hold.id(), hold))
184                .collect();
185
186            let storage_ids: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
187            for f in self
188                .storage_collections
189                .collections_frontiers(storage_ids)?
190            {
191                upper.extend(f.write_frontier);
192            }
193        }
194
195        for (&instance_id, collection_ids) in &id_bundle.compute_ids {
196            let client = self.ensure_compute_instance_client(instance_id).await?;
197
198            for (id, read_hold, write_frontier) in client
199                .acquire_read_holds_and_collection_write_frontiers(
200                    collection_ids.iter().copied().collect(),
201                )
202                .await?
203            {
204                let prev = read_holds
205                    .compute_holds
206                    .insert((instance_id, id), read_hold);
207                assert!(
208                    prev.is_none(),
209                    "duplicate compute ID in id_bundle {id_bundle:?}"
210                );
211
212                upper.extend(write_frontier);
213            }
214        }
215
216        Ok((read_holds, upper))
217    }
218
219    /// Implement a fast-path peek plan.
220    /// This is similar to `Coordinator::implement_peek_plan`, but only for fast path peeks.
221    ///
222    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
223    ///
224    /// Note: `input_read_holds` has holds for all inputs. For fast-path peeks, this includes the
225    /// peek target. For slow-path peeks (to be implemented later), we'll need to additionally call
226    /// into the Controller to acquire a hold on the peek target after we create the dataflow.
227    pub async fn implement_fast_path_peek_plan(
228        &mut self,
229        fast_path: FastPathPlan,
230        timestamp: Timestamp,
231        finishing: mz_expr::RowSetFinishing,
232        compute_instance: ComputeInstanceId,
233        target_replica: Option<mz_cluster_client::ReplicaId>,
234        intermediate_result_type: mz_repr::SqlRelationType,
235        max_result_size: u64,
236        max_returned_query_size: Option<u64>,
237        row_set_finishing_seconds: Histogram,
238        input_read_holds: ReadHolds<Timestamp>,
239        peek_stash_read_batch_size_bytes: usize,
240        peek_stash_read_memory_budget_bytes: usize,
241        conn_id: mz_adapter_types::connection::ConnectionId,
242        depends_on: std::collections::BTreeSet<mz_repr::GlobalId>,
243        watch_set: Option<WatchSetCreation>,
244    ) -> Result<crate::ExecuteResponse, AdapterError> {
245        // If the dataflow optimizes to a constant expression, we can immediately return the result.
246        if let FastPathPlan::Constant(rows_res, _) = fast_path {
247            // For constant queries with statement logging, immediately log that
248            // dependencies are "ready" (trivially, because there are none).
249            if let Some(ref ws) = watch_set {
250                self.log_lifecycle_event(
251                    ws.logging_id,
252                    statement_logging::StatementLifecycleEvent::StorageDependenciesFinished,
253                );
254                self.log_lifecycle_event(
255                    ws.logging_id,
256                    statement_logging::StatementLifecycleEvent::ComputeDependenciesFinished,
257                );
258            }
259
260            let mut rows = match rows_res {
261                Ok(rows) => rows,
262                Err(e) => return Err(e.into()),
263            };
264            consolidate(&mut rows);
265
266            let mut results = Vec::new();
267            for (row, count) in rows {
268                let count = match u64::try_from(count.into_inner()) {
269                    Ok(u) => usize::cast_from(u),
270                    Err(_) => {
271                        return Err(AdapterError::Unstructured(anyhow::anyhow!(
272                            "Negative multiplicity in constant result: {}",
273                            count
274                        )));
275                    }
276                };
277                match std::num::NonZeroUsize::new(count) {
278                    Some(nzu) => {
279                        results.push((row, nzu));
280                    }
281                    None => {
282                        // No need to retain 0 diffs.
283                    }
284                };
285            }
286            let row_collection = RowCollection::new(results, &finishing.order_by);
287            return match finishing.finish(
288                row_collection,
289                max_result_size,
290                max_returned_query_size,
291                &row_set_finishing_seconds,
292            ) {
293                Ok((rows, _bytes)) => Ok(Coordinator::send_immediate_rows(rows)),
294                // TODO(peek-seq): make this a structured error. (also in the old sequencing)
295                Err(e) => Err(AdapterError::ResultSize(e)),
296            };
297        }
298
299        let (peek_target, target_read_hold, literal_constraints, mfp, strategy) = match fast_path {
300            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, mfp) => {
301                let peek_target = PeekTarget::Index { id: idx_id };
302                let target_read_hold = input_read_holds
303                    .compute_holds
304                    .get(&(compute_instance, idx_id))
305                    .expect("missing compute read hold on PeekExisting peek target")
306                    .clone();
307                let strategy = statement_logging::StatementExecutionStrategy::FastPath;
308                (
309                    peek_target,
310                    target_read_hold,
311                    literal_constraints,
312                    mfp,
313                    strategy,
314                )
315            }
316            FastPathPlan::PeekPersist(coll_id, literal_constraint, mfp) => {
317                let literal_constraints = literal_constraint.map(|r| vec![r]);
318                let metadata = self
319                    .storage_collections
320                    .collection_metadata(coll_id)
321                    .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
322                    .clone();
323                let peek_target = PeekTarget::Persist {
324                    id: coll_id,
325                    metadata,
326                };
327                let target_read_hold = input_read_holds
328                    .storage_holds
329                    .get(&coll_id)
330                    .expect("missing storage read hold on PeekPersist peek target")
331                    .clone();
332                let strategy = statement_logging::StatementExecutionStrategy::PersistFastPath;
333                (
334                    peek_target,
335                    target_read_hold,
336                    literal_constraints,
337                    mfp,
338                    strategy,
339                )
340            }
341            _ => {
342                // FastPathPlan::Constant handled above.
343                unreachable!()
344            }
345        };
346
347        let (rows_tx, rows_rx) = oneshot::channel();
348        let uuid = Uuid::new_v4();
349
350        // At this stage we don't know column names for the result because we
351        // only know the peek's result type as a bare SqlRelationType.
352        let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
353        let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols);
354
355        let client = self
356            .ensure_compute_instance_client(compute_instance)
357            .await
358            .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?;
359
360        // Register coordinator tracking of this peek. This has to complete before issuing the peek.
361        //
362        // Warning: If we fail to actually issue the peek after this point, then we need to
363        // unregister it to avoid an orphaned registration.
364        self.call_coordinator(|tx| Command::RegisterFrontendPeek {
365            uuid,
366            conn_id: conn_id.clone(),
367            cluster_id: compute_instance,
368            depends_on,
369            is_fast_path: true,
370            watch_set,
371            tx,
372        })
373        .await?;
374
375        let finishing_for_instance = finishing.clone();
376        let peek_result = client
377            .peek(
378                peek_target,
379                literal_constraints,
380                uuid,
381                timestamp,
382                result_desc,
383                finishing_for_instance,
384                mfp,
385                target_read_hold,
386                target_replica,
387                rows_tx,
388            )
389            .await;
390
391        if let Err(err) = peek_result {
392            // Clean up the registered peek since the peek failed to issue.
393            // The frontend will handle statement logging for the error.
394            self.call_coordinator(|tx| Command::UnregisterFrontendPeek { uuid, tx })
395                .await;
396            return Err(
397                AdapterError::concurrent_dependency_drop_from_instance_peek_error(
398                    err,
399                    compute_instance,
400                ),
401            );
402        }
403
404        let peek_response_stream = Coordinator::create_peek_response_stream(
405            rows_rx,
406            finishing,
407            max_result_size,
408            max_returned_query_size,
409            row_set_finishing_seconds,
410            self.persist_client.clone(),
411            peek_stash_read_batch_size_bytes,
412            peek_stash_read_memory_budget_bytes,
413        );
414
415        Ok(crate::ExecuteResponse::SendingRowsStreaming {
416            rows: Box::pin(peek_response_stream),
417            instance_id: compute_instance,
418            strategy,
419        })
420    }
421
422    // Statement logging helper methods
423
424    /// Log the beginning of statement execution.
425    pub(crate) fn log_began_execution(
426        &self,
427        record: statement_logging::StatementBeganExecutionRecord,
428        mseh_update: Row,
429        prepared_statement: Option<PreparedStatementEvent>,
430    ) {
431        self.coordinator_client
432            .send(Command::FrontendStatementLogging(
433                FrontendStatementLoggingEvent::BeganExecution {
434                    record,
435                    mseh_update,
436                    prepared_statement,
437                },
438            ));
439    }
440
441    /// Log cluster selection for a statement.
442    pub(crate) fn log_set_cluster(
443        &self,
444        id: StatementLoggingId,
445        cluster_id: mz_controller_types::ClusterId,
446    ) {
447        self.coordinator_client
448            .send(Command::FrontendStatementLogging(
449                FrontendStatementLoggingEvent::SetCluster { id, cluster_id },
450            ));
451    }
452
453    /// Log timestamp determination for a statement.
454    pub(crate) fn log_set_timestamp(&self, id: StatementLoggingId, timestamp: mz_repr::Timestamp) {
455        self.coordinator_client
456            .send(Command::FrontendStatementLogging(
457                FrontendStatementLoggingEvent::SetTimestamp { id, timestamp },
458            ));
459    }
460
461    /// Log transient index ID for a statement.
462    pub(crate) fn log_set_transient_index_id(
463        &self,
464        id: StatementLoggingId,
465        transient_index_id: mz_repr::GlobalId,
466    ) {
467        self.coordinator_client
468            .send(Command::FrontendStatementLogging(
469                FrontendStatementLoggingEvent::SetTransientIndex {
470                    id,
471                    transient_index_id,
472                },
473            ));
474    }
475
476    /// Log a statement lifecycle event.
477    pub(crate) fn log_lifecycle_event(
478        &self,
479        id: StatementLoggingId,
480        event: statement_logging::StatementLifecycleEvent,
481    ) {
482        let when = (self.statement_logging_frontend.now)();
483        self.coordinator_client
484            .send(Command::FrontendStatementLogging(
485                FrontendStatementLoggingEvent::Lifecycle { id, event, when },
486            ));
487    }
488
489    /// Log the end of statement execution.
490    pub(crate) fn log_ended_execution(
491        &self,
492        id: StatementLoggingId,
493        reason: statement_logging::StatementEndedExecutionReason,
494    ) {
495        let ended_at = (self.statement_logging_frontend.now)();
496        let record = statement_logging::StatementEndedExecutionRecord {
497            id: id.0,
498            reason,
499            ended_at,
500        };
501        self.coordinator_client
502            .send(Command::FrontendStatementLogging(
503                FrontendStatementLoggingEvent::EndedExecution(record),
504            ));
505    }
506}
507
508/// Errors arising during collection lookup in peek client operations.
509#[derive(Error, Debug)]
510pub enum CollectionLookupError {
511    /// The specified compute instance does not exist.
512    #[error("instance does not exist: {0}")]
513    InstanceMissing(ComputeInstanceId),
514    /// The specified compute instance has shut down.
515    #[error("the instance has shut down")]
516    InstanceShutDown,
517    /// The compute collection does not exist.
518    #[error("collection does not exist: {0}")]
519    CollectionMissing(GlobalId),
520}
521
522impl From<InstanceMissing> for CollectionLookupError {
523    fn from(error: InstanceMissing) -> Self {
524        Self::InstanceMissing(error.0)
525    }
526}
527
528impl From<InstanceShutDown> for CollectionLookupError {
529    fn from(_error: InstanceShutDown) -> Self {
530        Self::InstanceShutDown
531    }
532}
533
534impl From<CollectionMissing> for CollectionLookupError {
535    fn from(error: CollectionMissing) -> Self {
536        Self::CollectionMissing(error.0)
537    }
538}
539
540impl From<AcquireReadHoldsError> for CollectionLookupError {
541    fn from(error: AcquireReadHoldsError) -> Self {
542        match error {
543            AcquireReadHoldsError::CollectionMissing(id) => Self::CollectionMissing(id),
544            AcquireReadHoldsError::InstanceShutDown => Self::InstanceShutDown,
545        }
546    }
547}