Skip to main content

mz_adapter/
peek_client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use differential_dataflow::consolidation::consolidate;
14use mz_compute_client::controller::error::{CollectionMissing, InstanceMissing};
15use mz_compute_client::controller::instance_client::InstanceClient;
16use mz_compute_client::controller::instance_client::{AcquireReadHoldsError, InstanceShutDown};
17use mz_compute_client::protocol::command::PeekTarget;
18use mz_compute_types::ComputeInstanceId;
19use mz_expr::row::RowCollection;
20use mz_ore::cast::CastFrom;
21use mz_persist_client::PersistClient;
22use mz_repr::GlobalId;
23use mz_repr::Timestamp;
24use mz_repr::global_id::TransientIdGen;
25use mz_repr::{RelationDesc, Row};
26use mz_sql::optimizer_metrics::OptimizerMetrics;
27use mz_sql::plan::Params;
28use mz_storage_types::sources::Timeline;
29use mz_timestamp_oracle::TimestampOracle;
30use prometheus::Histogram;
31use qcell::QCell;
32use thiserror::Error;
33use timely::progress::Antichain;
34use tokio::sync::oneshot;
35use uuid::Uuid;
36
37use crate::catalog::Catalog;
38use crate::command::{CatalogSnapshot, Command};
39use crate::coord::peek::FastPathPlan;
40use crate::coord::{Coordinator, ExecuteContextGuard};
41use crate::session::{LifecycleTimestamps, Session};
42use crate::statement_logging::{
43    FrontendStatementLoggingEvent, PreparedStatementEvent, PreparedStatementLoggingInfo,
44    StatementLoggingFrontend, StatementLoggingId, WatchSetCreation,
45};
46use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging};
47
48/// Storage collections trait alias we need to consult for since/frontiers.
49pub type StorageCollectionsHandle =
50    Arc<dyn mz_storage_client::storage_collections::StorageCollections + Send + Sync>;
51
52/// Clients needed for peek sequencing in the Adapter Frontend.
53#[derive(Debug)]
54pub struct PeekClient {
55    coordinator_client: Client,
56    /// Channels to talk to each compute Instance task directly. Lazily populated.
57    /// Note that these are never cleaned up. In theory, this could lead to a very slow memory leak
58    /// if a long-running user session keeps peeking on clusters that are being created and dropped
59    /// in a hot loop. Hopefully this won't occur any time soon.
60    compute_instances: BTreeMap<ComputeInstanceId, InstanceClient>,
61    /// Handle to storage collections for reading frontiers and policies.
62    pub storage_collections: StorageCollectionsHandle,
63    /// A generator for transient `GlobalId`s, shared with Coordinator.
64    pub transient_id_gen: Arc<TransientIdGen>,
65    pub optimizer_metrics: OptimizerMetrics,
66    /// Per-timeline oracles from the coordinator. Lazily populated.
67    oracles: BTreeMap<Timeline, Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
68    persist_client: PersistClient,
69    /// Statement logging state for frontend peek sequencing.
70    pub statement_logging_frontend: StatementLoggingFrontend,
71}
72
73impl PeekClient {
74    /// Creates a PeekClient.
75    pub fn new(
76        coordinator_client: Client,
77        storage_collections: StorageCollectionsHandle,
78        transient_id_gen: Arc<TransientIdGen>,
79        optimizer_metrics: OptimizerMetrics,
80        persist_client: PersistClient,
81        statement_logging_frontend: StatementLoggingFrontend,
82    ) -> Self {
83        Self {
84            coordinator_client,
85            compute_instances: Default::default(), // lazily populated
86            storage_collections,
87            transient_id_gen,
88            optimizer_metrics,
89            statement_logging_frontend,
90            oracles: Default::default(), // lazily populated
91            persist_client,
92        }
93    }
94
95    pub async fn ensure_compute_instance_client(
96        &mut self,
97        compute_instance: ComputeInstanceId,
98    ) -> Result<InstanceClient, InstanceMissing> {
99        if !self.compute_instances.contains_key(&compute_instance) {
100            let client = self
101                .call_coordinator(|tx| Command::GetComputeInstanceClient {
102                    instance_id: compute_instance,
103                    tx,
104                })
105                .await?;
106            self.compute_instances.insert(compute_instance, client);
107        }
108        Ok(self
109            .compute_instances
110            .get(&compute_instance)
111            .expect("ensured above")
112            .clone())
113    }
114
115    pub async fn ensure_oracle(
116        &mut self,
117        timeline: Timeline,
118    ) -> Result<&mut Arc<dyn TimestampOracle<Timestamp> + Send + Sync>, AdapterError> {
119        if !self.oracles.contains_key(&timeline) {
120            let oracle = self
121                .call_coordinator(|tx| Command::GetOracle {
122                    timeline: timeline.clone(),
123                    tx,
124                })
125                .await?;
126            self.oracles.insert(timeline.clone(), oracle);
127        }
128        Ok(self.oracles.get_mut(&timeline).expect("ensured above"))
129    }
130
131    /// Fetch a snapshot of the catalog for use in frontend peek sequencing.
132    /// Records the time taken in the adapter metrics, labeled by `context`.
133    pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
134        let start = std::time::Instant::now();
135        let CatalogSnapshot { catalog } = self
136            .call_coordinator(|tx| Command::CatalogSnapshot { tx })
137            .await;
138        self.coordinator_client
139            .metrics()
140            .catalog_snapshot_seconds
141            .with_label_values(&[context])
142            .observe(start.elapsed().as_secs_f64());
143        catalog
144    }
145
146    pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
147    where
148        F: FnOnce(oneshot::Sender<T>) -> Command,
149    {
150        let (tx, rx) = oneshot::channel();
151        self.coordinator_client.send(f(tx));
152        rx.await
153            .expect("if the coordinator is still alive, it shouldn't have dropped our call")
154    }
155
156    /// Acquire read holds on the given compute/storage collections, and
157    /// determine the smallest common valid write frontier among the specified collections.
158    ///
159    /// Similar to `Coordinator::acquire_read_holds` and `TimestampProvider::least_valid_write`
160    /// combined.
161    ///
162    /// Note: Unlike the Coordinator/StorageController's `least_valid_write` that treats sinks
163    /// specially when fetching storage frontiers (see `mz_storage_controller::collections_frontiers`),
164    /// we intentionally do not special‑case sinks here because peeks never read from sinks.
165    /// Therefore, using `StorageCollections::collections_frontiers` is sufficient.
166    ///
167    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
168    pub async fn acquire_read_holds_and_least_valid_write(
169        &mut self,
170        id_bundle: &CollectionIdBundle,
171    ) -> Result<(ReadHolds, Antichain<Timestamp>), CollectionLookupError> {
172        let mut read_holds = ReadHolds::new();
173        let mut upper = Antichain::new();
174
175        if !id_bundle.storage_ids.is_empty() {
176            let desired_storage: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
177            let storage_read_holds = self
178                .storage_collections
179                .acquire_read_holds(desired_storage)?;
180            read_holds.storage_holds = storage_read_holds
181                .into_iter()
182                .map(|hold| (hold.id(), hold))
183                .collect();
184
185            let storage_ids: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
186            for f in self
187                .storage_collections
188                .collections_frontiers(storage_ids)?
189            {
190                upper.extend(f.write_frontier);
191            }
192        }
193
194        for (&instance_id, collection_ids) in &id_bundle.compute_ids {
195            let client = self.ensure_compute_instance_client(instance_id).await?;
196
197            for (id, read_hold, write_frontier) in client
198                .acquire_read_holds_and_collection_write_frontiers(
199                    collection_ids.iter().copied().collect(),
200                )
201                .await?
202            {
203                let prev = read_holds
204                    .compute_holds
205                    .insert((instance_id, id), read_hold);
206                assert!(
207                    prev.is_none(),
208                    "duplicate compute ID in id_bundle {id_bundle:?}"
209                );
210
211                upper.extend(write_frontier);
212            }
213        }
214
215        Ok((read_holds, upper))
216    }
217
218    /// Implement a fast-path peek plan.
219    /// This is similar to `Coordinator::implement_peek_plan`, but only for fast path peeks.
220    ///
221    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
222    ///
223    /// Note: `input_read_holds` has holds for all inputs. For fast-path peeks, this includes the
224    /// peek target. For slow-path peeks (to be implemented later), we'll need to additionally call
225    /// into the Controller to acquire a hold on the peek target after we create the dataflow.
226    pub async fn implement_fast_path_peek_plan(
227        &mut self,
228        fast_path: FastPathPlan,
229        timestamp: Timestamp,
230        finishing: mz_expr::RowSetFinishing,
231        compute_instance: ComputeInstanceId,
232        target_replica: Option<mz_cluster_client::ReplicaId>,
233        intermediate_result_type: mz_repr::SqlRelationType,
234        max_result_size: u64,
235        max_returned_query_size: Option<u64>,
236        row_set_finishing_seconds: Histogram,
237        input_read_holds: ReadHolds,
238        peek_stash_read_batch_size_bytes: usize,
239        peek_stash_read_memory_budget_bytes: usize,
240        conn_id: mz_adapter_types::connection::ConnectionId,
241        depends_on: std::collections::BTreeSet<mz_repr::GlobalId>,
242        watch_set: Option<WatchSetCreation>,
243    ) -> Result<crate::ExecuteResponse, AdapterError> {
244        // If the dataflow optimizes to a constant expression, we can immediately return the result.
245        if let FastPathPlan::Constant(rows_res, _) = fast_path {
246            // For constant queries with statement logging, immediately log that
247            // dependencies are "ready" (trivially, because there are none).
248            if let Some(ref ws) = watch_set {
249                self.log_lifecycle_event(
250                    ws.logging_id,
251                    statement_logging::StatementLifecycleEvent::StorageDependenciesFinished,
252                );
253                self.log_lifecycle_event(
254                    ws.logging_id,
255                    statement_logging::StatementLifecycleEvent::ComputeDependenciesFinished,
256                );
257            }
258
259            let mut rows = match rows_res {
260                Ok(rows) => rows,
261                Err(e) => return Err(e.into()),
262            };
263            consolidate(&mut rows);
264
265            let mut results = Vec::new();
266            for (row, count) in rows {
267                let count = match u64::try_from(count.into_inner()) {
268                    Ok(u) => usize::cast_from(u),
269                    Err(_) => {
270                        return Err(AdapterError::Unstructured(anyhow::anyhow!(
271                            "Negative multiplicity in constant result: {}",
272                            count
273                        )));
274                    }
275                };
276                match std::num::NonZeroUsize::new(count) {
277                    Some(nzu) => {
278                        results.push((row, nzu));
279                    }
280                    None => {
281                        // No need to retain 0 diffs.
282                    }
283                };
284            }
285            let row_collection = RowCollection::new(results, &finishing.order_by);
286            return match finishing.finish(
287                row_collection,
288                max_result_size,
289                max_returned_query_size,
290                &row_set_finishing_seconds,
291            ) {
292                Ok((rows, _bytes)) => Ok(Coordinator::send_immediate_rows(rows)),
293                // TODO(peek-seq): make this a structured error. (also in the old sequencing)
294                Err(e) => Err(AdapterError::ResultSize(e)),
295            };
296        }
297
298        let (peek_target, target_read_hold, literal_constraints, mfp, strategy) = match fast_path {
299            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, mfp) => {
300                let peek_target = PeekTarget::Index { id: idx_id };
301                let target_read_hold = input_read_holds
302                    .compute_holds
303                    .get(&(compute_instance, idx_id))
304                    .expect("missing compute read hold on PeekExisting peek target")
305                    .clone();
306                let strategy = statement_logging::StatementExecutionStrategy::FastPath;
307                (
308                    peek_target,
309                    target_read_hold,
310                    literal_constraints,
311                    mfp,
312                    strategy,
313                )
314            }
315            FastPathPlan::PeekPersist(coll_id, literal_constraint, mfp) => {
316                let literal_constraints = literal_constraint.map(|r| vec![r]);
317                let metadata = self
318                    .storage_collections
319                    .collection_metadata(coll_id)
320                    .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
321                    .clone();
322                let peek_target = PeekTarget::Persist {
323                    id: coll_id,
324                    metadata,
325                };
326                let target_read_hold = input_read_holds
327                    .storage_holds
328                    .get(&coll_id)
329                    .expect("missing storage read hold on PeekPersist peek target")
330                    .clone();
331                let strategy = statement_logging::StatementExecutionStrategy::PersistFastPath;
332                (
333                    peek_target,
334                    target_read_hold,
335                    literal_constraints,
336                    mfp,
337                    strategy,
338                )
339            }
340            FastPathPlan::Constant(..) => {
341                // FastPathPlan::Constant handled above.
342                unreachable!()
343            }
344        };
345
346        let (rows_tx, rows_rx) = oneshot::channel();
347        let uuid = Uuid::new_v4();
348
349        // At this stage we don't know column names for the result because we
350        // only know the peek's result type as a bare SqlRelationType.
351        let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
352        let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols);
353
354        let client = self
355            .ensure_compute_instance_client(compute_instance)
356            .await
357            .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?;
358
359        // Register coordinator tracking of this peek. This has to complete before issuing the peek.
360        //
361        // Warning: If we fail to actually issue the peek after this point, then we need to
362        // unregister it to avoid an orphaned registration.
363        self.call_coordinator(|tx| Command::RegisterFrontendPeek {
364            uuid,
365            conn_id: conn_id.clone(),
366            cluster_id: compute_instance,
367            depends_on,
368            is_fast_path: true,
369            watch_set,
370            tx,
371        })
372        .await?;
373
374        let finishing_for_instance = finishing.clone();
375        let peek_result = client
376            .peek(
377                peek_target,
378                literal_constraints,
379                uuid,
380                timestamp,
381                result_desc,
382                finishing_for_instance,
383                mfp,
384                target_read_hold,
385                target_replica,
386                rows_tx,
387            )
388            .await;
389
390        if let Err(err) = peek_result {
391            // Clean up the registered peek since the peek failed to issue.
392            // The frontend will handle statement logging for the error.
393            self.call_coordinator(|tx| Command::UnregisterFrontendPeek { uuid, tx })
394                .await;
395            return Err(
396                AdapterError::concurrent_dependency_drop_from_instance_peek_error(
397                    err,
398                    compute_instance,
399                ),
400            );
401        }
402
403        let peek_response_stream = Coordinator::create_peek_response_stream(
404            rows_rx,
405            finishing,
406            max_result_size,
407            max_returned_query_size,
408            row_set_finishing_seconds,
409            self.persist_client.clone(),
410            peek_stash_read_batch_size_bytes,
411            peek_stash_read_memory_budget_bytes,
412        );
413
414        Ok(crate::ExecuteResponse::SendingRowsStreaming {
415            rows: Box::pin(peek_response_stream),
416            instance_id: compute_instance,
417            strategy,
418        })
419    }
420
421    /// Set up statement logging for a frontend-sequenced operation.
422    ///
423    /// If `outer_ctx_extra` is `None`, begins a new statement execution log
424    /// entry. If `outer_ctx_extra` is `Some` (e.g. EXECUTE/FETCH), reuses and
425    /// retires the existing logging context.
426    ///
427    /// Returns a [`StatementLoggingGuard`]. Callers must
428    /// [`defuse`](StatementLoggingGuard::defuse) the guard when handing off
429    /// logging responsibility (or, in future, retire it explicitly on a
430    /// terminal outcome). Dropping the guard without retiring it emits an
431    /// `Aborted` end-execution event.
432    pub(crate) fn begin_statement_logging(
433        &self,
434        session: &mut Session,
435        params: &Params,
436        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
437        catalog: &Catalog,
438        lifecycle_timestamps: Option<LifecycleTimestamps>,
439        outer_ctx_extra: &mut Option<ExecuteContextGuard>,
440    ) -> StatementLoggingGuard {
441        let id = if outer_ctx_extra.is_none() {
442            // This is a new statement, so begin statement logging.
443            let result = self.statement_logging_frontend.begin_statement_execution(
444                session,
445                params,
446                logging,
447                catalog.system_config(),
448                lifecycle_timestamps,
449            );
450
451            if let Some((logging_id, began_execution, mseh_update, prepared_statement)) = result {
452                self.log_began_execution(began_execution, mseh_update, prepared_statement);
453                Some(logging_id)
454            } else {
455                None
456            }
457        } else {
458            // We're executing in the context of another statement (e.g. FETCH),
459            // so take ownership of the outer context and inherit its logging id
460            // (if any). The end of execution will be logged by the caller.
461            outer_ctx_extra
462                .take()
463                .and_then(|guard| guard.defuse().retire())
464        };
465
466        StatementLoggingGuard {
467            id,
468            coordinator_client: self.coordinator_client.clone(),
469            now: self.statement_logging_frontend.now.clone(),
470        }
471    }
472
473    /// Log the beginning of statement execution.
474    pub(crate) fn log_began_execution(
475        &self,
476        record: statement_logging::StatementBeganExecutionRecord,
477        mseh_update: Row,
478        prepared_statement: Option<PreparedStatementEvent>,
479    ) {
480        self.coordinator_client
481            .send(Command::FrontendStatementLogging(
482                FrontendStatementLoggingEvent::BeganExecution {
483                    record,
484                    mseh_update,
485                    prepared_statement,
486                },
487            ));
488    }
489
490    /// Log cluster selection for a statement.
491    pub(crate) fn log_set_cluster(
492        &self,
493        id: StatementLoggingId,
494        cluster_id: mz_controller_types::ClusterId,
495    ) {
496        self.coordinator_client
497            .send(Command::FrontendStatementLogging(
498                FrontendStatementLoggingEvent::SetCluster { id, cluster_id },
499            ));
500    }
501
502    /// Log timestamp determination for a statement.
503    pub(crate) fn log_set_timestamp(&self, id: StatementLoggingId, timestamp: mz_repr::Timestamp) {
504        self.coordinator_client
505            .send(Command::FrontendStatementLogging(
506                FrontendStatementLoggingEvent::SetTimestamp { id, timestamp },
507            ));
508    }
509
510    /// Log transient index ID for a statement.
511    pub(crate) fn log_set_transient_index_id(
512        &self,
513        id: StatementLoggingId,
514        transient_index_id: mz_repr::GlobalId,
515    ) {
516        self.coordinator_client
517            .send(Command::FrontendStatementLogging(
518                FrontendStatementLoggingEvent::SetTransientIndex {
519                    id,
520                    transient_index_id,
521                },
522            ));
523    }
524
525    /// Log a statement lifecycle event.
526    pub(crate) fn log_lifecycle_event(
527        &self,
528        id: StatementLoggingId,
529        event: statement_logging::StatementLifecycleEvent,
530    ) {
531        let when = (self.statement_logging_frontend.now)();
532        self.coordinator_client
533            .send(Command::FrontendStatementLogging(
534                FrontendStatementLoggingEvent::Lifecycle { id, event, when },
535            ));
536    }
537
538    /// Emit a `FrontendStatementLoggingEvent::EndedExecution` for the given
539    /// logging id. Used by callers that manage the statement-logging
540    /// lifecycle explicitly (see `try_frontend_peek`), rather than via the
541    /// RAII [`StatementLoggingGuard`].
542    pub(crate) fn log_ended_execution(
543        &self,
544        id: StatementLoggingId,
545        reason: statement_logging::StatementEndedExecutionReason,
546    ) {
547        let ended_at = (self.statement_logging_frontend.now)();
548        let record = statement_logging::StatementEndedExecutionRecord {
549            id: id.0,
550            reason,
551            ended_at,
552        };
553        self.coordinator_client
554            .send(Command::FrontendStatementLogging(
555                FrontendStatementLoggingEvent::EndedExecution(record),
556            ));
557    }
558}
559
560/// RAII guard owning a frontend statement-logging lifecycle.
561///
562/// Created by [`PeekClient::begin_statement_logging`]. Unless logging
563/// responsibility is handed off via [`defuse`](StatementLoggingGuard::defuse),
564/// the guard ensures that every statement for which `BeganExecution` was logged
565/// also receives a corresponding `EndedExecution`, even on early-return, panic,
566/// or mid-flight drop of the enclosing future: if the guard is dropped without
567/// being defused, it emits `StatementEndedExecutionReason::Aborted`.
568///
569/// When the guard is `defuse`d, some other component (e.g. the coordinator, for
570/// streaming peek / subscribe responses) takes over and logs `EndedExecution`
571/// itself.
572///
573/// For non-sampled statements the guard still exists but carries no id, and
574/// retirement / drop are no-ops.
575#[must_use = "StatementLoggingGuard must be explicitly retired or handed off; \
576              otherwise `Drop` will log the statement as Aborted"]
577pub(crate) struct StatementLoggingGuard {
578    /// `None` if the statement was not sampled for logging.
579    id: Option<StatementLoggingId>,
580    coordinator_client: Client,
581    now: mz_ore::now::NowFn,
582}
583
584impl StatementLoggingGuard {
585    /// Returns the logging id, if this statement is being logged.
586    pub(crate) fn id(&self) -> Option<StatementLoggingId> {
587        self.id
588    }
589
590    /// Hands off logging responsibility without emitting an end-execution
591    /// event. Use when another component (e.g. the coordinator, for streaming
592    /// peek / subscribe responses) will log the end asynchronously.
593    pub(crate) fn defuse(mut self) {
594        self.id = None;
595    }
596
597    fn emit(&mut self, reason: statement_logging::StatementEndedExecutionReason) {
598        let Some(id) = self.id.take() else {
599            return;
600        };
601        let ended_at = (self.now)();
602        let record = statement_logging::StatementEndedExecutionRecord {
603            id: id.0,
604            reason,
605            ended_at,
606        };
607        self.coordinator_client
608            .send(Command::FrontendStatementLogging(
609                FrontendStatementLoggingEvent::EndedExecution(record),
610            ));
611    }
612}
613
614impl Drop for StatementLoggingGuard {
615    fn drop(&mut self) {
616        // `emit` is a no-op if the guard was already retired or defused (i.e.
617        // `id` is `None`).
618        self.emit(statement_logging::StatementEndedExecutionReason::Aborted);
619    }
620}
621
622/// Errors arising during collection lookup in peek client operations.
623#[derive(Error, Debug)]
624pub enum CollectionLookupError {
625    /// The specified compute instance does not exist.
626    #[error("instance does not exist: {0}")]
627    InstanceMissing(ComputeInstanceId),
628    /// The specified compute instance has shut down.
629    #[error("the instance has shut down")]
630    InstanceShutDown,
631    /// The compute collection does not exist.
632    #[error("collection does not exist: {0}")]
633    CollectionMissing(GlobalId),
634}
635
636impl From<InstanceMissing> for CollectionLookupError {
637    fn from(error: InstanceMissing) -> Self {
638        Self::InstanceMissing(error.0)
639    }
640}
641
642impl From<InstanceShutDown> for CollectionLookupError {
643    fn from(_error: InstanceShutDown) -> Self {
644        Self::InstanceShutDown
645    }
646}
647
648impl From<CollectionMissing> for CollectionLookupError {
649    fn from(error: CollectionMissing) -> Self {
650        Self::CollectionMissing(error.0)
651    }
652}
653
654impl From<AcquireReadHoldsError> for CollectionLookupError {
655    fn from(error: AcquireReadHoldsError) -> Self {
656        match error {
657            AcquireReadHoldsError::CollectionMissing(id) => Self::CollectionMissing(id),
658            AcquireReadHoldsError::InstanceShutDown => Self::InstanceShutDown,
659        }
660    }
661}