Skip to main content

mz_adapter/
peek_client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use differential_dataflow::consolidation::consolidate;
14use mz_compute_client::controller::error::{CollectionLookupError, InstanceMissing};
15use mz_compute_client::protocol::command::PeekTarget;
16use mz_compute_types::ComputeInstanceId;
17use mz_expr::row::RowCollection;
18use mz_ore::cast::CastFrom;
19use mz_persist_client::PersistClient;
20use mz_repr::Timestamp;
21use mz_repr::global_id::TransientIdGen;
22use mz_repr::{RelationDesc, Row};
23use mz_sql::optimizer_metrics::OptimizerMetrics;
24use mz_storage_types::sources::Timeline;
25use mz_timestamp_oracle::TimestampOracle;
26use prometheus::Histogram;
27use timely::progress::Antichain;
28use tokio::sync::oneshot;
29use uuid::Uuid;
30
31use crate::catalog::Catalog;
32use crate::command::{CatalogSnapshot, Command};
33use crate::coord::Coordinator;
34use crate::coord::peek::FastPathPlan;
35use crate::statement_logging::WatchSetCreation;
36use crate::statement_logging::{
37    FrontendStatementLoggingEvent, PreparedStatementEvent, StatementLoggingFrontend,
38    StatementLoggingId,
39};
40use crate::{AdapterError, Client, CollectionIdBundle, ReadHolds, statement_logging};
41
42/// Storage collections trait alias we need to consult for since/frontiers.
43pub type StorageCollectionsHandle = Arc<
44    dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = Timestamp>
45        + Send
46        + Sync,
47>;
48
49/// Clients needed for peek sequencing in the Adapter Frontend.
50#[derive(Debug)]
51pub struct PeekClient {
52    coordinator_client: Client,
53    /// Channels to talk to each compute Instance task directly. Lazily populated.
54    /// Note that these are never cleaned up. In theory, this could lead to a very slow memory leak
55    /// if a long-running user session keeps peeking on clusters that are being created and dropped
56    /// in a hot loop. Hopefully this won't occur any time soon.
57    compute_instances:
58        BTreeMap<ComputeInstanceId, mz_compute_client::controller::instance::Client<Timestamp>>,
59    /// Handle to storage collections for reading frontiers and policies.
60    pub storage_collections: StorageCollectionsHandle,
61    /// A generator for transient `GlobalId`s, shared with Coordinator.
62    pub transient_id_gen: Arc<TransientIdGen>,
63    pub optimizer_metrics: OptimizerMetrics,
64    /// Per-timeline oracles from the coordinator. Lazily populated.
65    oracles: BTreeMap<Timeline, Arc<dyn TimestampOracle<Timestamp> + Send + Sync>>,
66    persist_client: PersistClient,
67    /// Statement logging state for frontend peek sequencing.
68    pub statement_logging_frontend: StatementLoggingFrontend,
69}
70
71impl PeekClient {
72    /// Creates a PeekClient.
73    pub fn new(
74        coordinator_client: Client,
75        storage_collections: StorageCollectionsHandle,
76        transient_id_gen: Arc<TransientIdGen>,
77        optimizer_metrics: OptimizerMetrics,
78        persist_client: PersistClient,
79        statement_logging_frontend: StatementLoggingFrontend,
80    ) -> Self {
81        Self {
82            coordinator_client,
83            compute_instances: Default::default(), // lazily populated
84            storage_collections,
85            transient_id_gen,
86            optimizer_metrics,
87            statement_logging_frontend,
88            oracles: Default::default(), // lazily populated
89            persist_client,
90        }
91    }
92
93    pub async fn ensure_compute_instance_client(
94        &mut self,
95        compute_instance: ComputeInstanceId,
96    ) -> Result<mz_compute_client::controller::instance::Client<Timestamp>, InstanceMissing> {
97        if !self.compute_instances.contains_key(&compute_instance) {
98            let client = self
99                .call_coordinator(|tx| Command::GetComputeInstanceClient {
100                    instance_id: compute_instance,
101                    tx,
102                })
103                .await?;
104            self.compute_instances.insert(compute_instance, client);
105        }
106        Ok(self
107            .compute_instances
108            .get(&compute_instance)
109            .expect("ensured above")
110            .clone())
111    }
112
113    pub async fn ensure_oracle(
114        &mut self,
115        timeline: Timeline,
116    ) -> Result<&mut Arc<dyn TimestampOracle<Timestamp> + Send + Sync>, AdapterError> {
117        if !self.oracles.contains_key(&timeline) {
118            let oracle = self
119                .call_coordinator(|tx| Command::GetOracle {
120                    timeline: timeline.clone(),
121                    tx,
122                })
123                .await?;
124            self.oracles.insert(timeline.clone(), oracle);
125        }
126        Ok(self.oracles.get_mut(&timeline).expect("ensured above"))
127    }
128
129    /// Fetch a snapshot of the catalog for use in frontend peek sequencing.
130    /// Records the time taken in the adapter metrics, labeled by `context`.
131    pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
132        let start = std::time::Instant::now();
133        let CatalogSnapshot { catalog } = self
134            .call_coordinator(|tx| Command::CatalogSnapshot { tx })
135            .await;
136        self.coordinator_client
137            .metrics()
138            .catalog_snapshot_seconds
139            .with_label_values(&[context])
140            .observe(start.elapsed().as_secs_f64());
141        catalog
142    }
143
144    pub(crate) async fn call_coordinator<T, F>(&self, f: F) -> T
145    where
146        F: FnOnce(oneshot::Sender<T>) -> Command,
147    {
148        let (tx, rx) = oneshot::channel();
149        self.coordinator_client.send(f(tx));
150        rx.await
151            .expect("if the coordinator is still alive, it shouldn't have dropped our call")
152    }
153
154    /// Acquire read holds on the given compute/storage collections, and
155    /// determine the smallest common valid write frontier among the specified collections.
156    ///
157    /// Similar to `Coordinator::acquire_read_holds` and `TimestampProvider::least_valid_write`
158    /// combined.
159    ///
160    /// Note: Unlike the Coordinator/StorageController's `least_valid_write` that treats sinks
161    /// specially when fetching storage frontiers (see `mz_storage_controller::collections_frontiers`),
162    /// we intentionally do not special‑case sinks here because peeks never read from sinks.
163    /// Therefore, using `StorageCollections::collections_frontiers` is sufficient.
164    ///
165    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
166    pub async fn acquire_read_holds_and_least_valid_write(
167        &mut self,
168        id_bundle: &CollectionIdBundle,
169    ) -> Result<(ReadHolds<Timestamp>, Antichain<Timestamp>), CollectionLookupError> {
170        let mut read_holds = ReadHolds::new();
171        let mut upper = Antichain::new();
172
173        if !id_bundle.storage_ids.is_empty() {
174            let desired_storage: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
175            let storage_read_holds = self
176                .storage_collections
177                .acquire_read_holds(desired_storage)?;
178            read_holds.storage_holds = storage_read_holds
179                .into_iter()
180                .map(|hold| (hold.id(), hold))
181                .collect();
182
183            let storage_ids: Vec<_> = id_bundle.storage_ids.iter().copied().collect();
184            for f in self
185                .storage_collections
186                .collections_frontiers(storage_ids)?
187            {
188                upper.extend(f.write_frontier);
189            }
190        }
191
192        for (&instance_id, collection_ids) in &id_bundle.compute_ids {
193            let client = self.ensure_compute_instance_client(instance_id).await?;
194
195            for (id, read_hold, write_frontier) in client
196                .acquire_read_holds_and_collection_write_frontiers(
197                    collection_ids.iter().copied().collect(),
198                )
199                .await?
200            {
201                let prev = read_holds
202                    .compute_holds
203                    .insert((instance_id, id), read_hold);
204                assert!(
205                    prev.is_none(),
206                    "duplicate compute ID in id_bundle {id_bundle:?}"
207                );
208
209                upper.extend(write_frontier);
210            }
211        }
212
213        Ok((read_holds, upper))
214    }
215
216    /// Implement a fast-path peek plan.
217    /// This is similar to `Coordinator::implement_peek_plan`, but only for fast path peeks.
218    ///
219    /// Note: self is taken &mut because of the lazy fetching in `get_compute_instance_client`.
220    ///
221    /// Note: `input_read_holds` has holds for all inputs. For fast-path peeks, this includes the
222    /// peek target. For slow-path peeks (to be implemented later), we'll need to additionally call
223    /// into the Controller to acquire a hold on the peek target after we create the dataflow.
224    pub async fn implement_fast_path_peek_plan(
225        &mut self,
226        fast_path: FastPathPlan,
227        timestamp: Timestamp,
228        finishing: mz_expr::RowSetFinishing,
229        compute_instance: ComputeInstanceId,
230        target_replica: Option<mz_cluster_client::ReplicaId>,
231        intermediate_result_type: mz_repr::SqlRelationType,
232        max_result_size: u64,
233        max_returned_query_size: Option<u64>,
234        row_set_finishing_seconds: Histogram,
235        input_read_holds: ReadHolds<Timestamp>,
236        peek_stash_read_batch_size_bytes: usize,
237        peek_stash_read_memory_budget_bytes: usize,
238        conn_id: mz_adapter_types::connection::ConnectionId,
239        depends_on: std::collections::BTreeSet<mz_repr::GlobalId>,
240        watch_set: Option<WatchSetCreation>,
241    ) -> Result<crate::ExecuteResponse, AdapterError> {
242        // If the dataflow optimizes to a constant expression, we can immediately return the result.
243        if let FastPathPlan::Constant(rows_res, _) = fast_path {
244            // For constant queries with statement logging, immediately log that
245            // dependencies are "ready" (trivially, because there are none).
246            if let Some(ref ws) = watch_set {
247                self.log_lifecycle_event(
248                    ws.logging_id,
249                    statement_logging::StatementLifecycleEvent::StorageDependenciesFinished,
250                );
251                self.log_lifecycle_event(
252                    ws.logging_id,
253                    statement_logging::StatementLifecycleEvent::ComputeDependenciesFinished,
254                );
255            }
256
257            let mut rows = match rows_res {
258                Ok(rows) => rows,
259                Err(e) => return Err(e.into()),
260            };
261            consolidate(&mut rows);
262
263            let mut results = Vec::new();
264            for (row, count) in rows {
265                let count = match u64::try_from(count.into_inner()) {
266                    Ok(u) => usize::cast_from(u),
267                    Err(_) => {
268                        return Err(AdapterError::Unstructured(anyhow::anyhow!(
269                            "Negative multiplicity in constant result: {}",
270                            count
271                        )));
272                    }
273                };
274                match std::num::NonZeroUsize::new(count) {
275                    Some(nzu) => {
276                        results.push((row, nzu));
277                    }
278                    None => {
279                        // No need to retain 0 diffs.
280                    }
281                };
282            }
283            let row_collection = RowCollection::new(results, &finishing.order_by);
284            return match finishing.finish(
285                row_collection,
286                max_result_size,
287                max_returned_query_size,
288                &row_set_finishing_seconds,
289            ) {
290                Ok((rows, _bytes)) => Ok(Coordinator::send_immediate_rows(rows)),
291                // TODO(peek-seq): make this a structured error. (also in the old sequencing)
292                Err(e) => Err(AdapterError::ResultSize(e)),
293            };
294        }
295
296        let (peek_target, target_read_hold, literal_constraints, mfp, strategy) = match fast_path {
297            FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, mfp) => {
298                let peek_target = PeekTarget::Index { id: idx_id };
299                let target_read_hold = input_read_holds
300                    .compute_holds
301                    .get(&(compute_instance, idx_id))
302                    .expect("missing compute read hold on PeekExisting peek target")
303                    .clone();
304                let strategy = statement_logging::StatementExecutionStrategy::FastPath;
305                (
306                    peek_target,
307                    target_read_hold,
308                    literal_constraints,
309                    mfp,
310                    strategy,
311                )
312            }
313            FastPathPlan::PeekPersist(coll_id, literal_constraint, mfp) => {
314                let literal_constraints = literal_constraint.map(|r| vec![r]);
315                let metadata = self
316                    .storage_collections
317                    .collection_metadata(coll_id)
318                    .map_err(AdapterError::concurrent_dependency_drop_from_collection_missing)?
319                    .clone();
320                let peek_target = PeekTarget::Persist {
321                    id: coll_id,
322                    metadata,
323                };
324                let target_read_hold = input_read_holds
325                    .storage_holds
326                    .get(&coll_id)
327                    .expect("missing storage read hold on PeekPersist peek target")
328                    .clone();
329                let strategy = statement_logging::StatementExecutionStrategy::PersistFastPath;
330                (
331                    peek_target,
332                    target_read_hold,
333                    literal_constraints,
334                    mfp,
335                    strategy,
336                )
337            }
338            _ => {
339                // FastPathPlan::Constant handled above.
340                unreachable!()
341            }
342        };
343
344        let (rows_tx, rows_rx) = oneshot::channel();
345        let uuid = Uuid::new_v4();
346
347        // At this stage we don't know column names for the result because we
348        // only know the peek's result type as a bare SqlRelationType.
349        let cols = (0..intermediate_result_type.arity()).map(|i| format!("peek_{i}"));
350        let result_desc = RelationDesc::new(intermediate_result_type.clone(), cols);
351
352        let client = self
353            .ensure_compute_instance_client(compute_instance)
354            .await
355            .map_err(AdapterError::concurrent_dependency_drop_from_instance_missing)?;
356
357        // Register coordinator tracking of this peek. This has to complete before issuing the peek.
358        //
359        // Warning: If we fail to actually issue the peek after this point, then we need to
360        // unregister it to avoid an orphaned registration.
361        self.call_coordinator(|tx| Command::RegisterFrontendPeek {
362            uuid,
363            conn_id: conn_id.clone(),
364            cluster_id: compute_instance,
365            depends_on,
366            is_fast_path: true,
367            watch_set,
368            tx,
369        })
370        .await?;
371
372        let finishing_for_instance = finishing.clone();
373        let peek_result = client
374            .peek(
375                peek_target,
376                literal_constraints,
377                uuid,
378                timestamp,
379                result_desc,
380                finishing_for_instance,
381                mfp,
382                target_read_hold,
383                target_replica,
384                rows_tx,
385            )
386            .await;
387
388        if let Err(err) = peek_result {
389            // Clean up the registered peek since the peek failed to issue.
390            // The frontend will handle statement logging for the error.
391            self.call_coordinator(|tx| Command::UnregisterFrontendPeek { uuid, tx })
392                .await;
393            return Err(
394                AdapterError::concurrent_dependency_drop_from_instance_peek_error(
395                    err,
396                    compute_instance,
397                ),
398            );
399        }
400
401        let peek_response_stream = Coordinator::create_peek_response_stream(
402            rows_rx,
403            finishing,
404            max_result_size,
405            max_returned_query_size,
406            row_set_finishing_seconds,
407            self.persist_client.clone(),
408            peek_stash_read_batch_size_bytes,
409            peek_stash_read_memory_budget_bytes,
410        );
411
412        Ok(crate::ExecuteResponse::SendingRowsStreaming {
413            rows: Box::pin(peek_response_stream),
414            instance_id: compute_instance,
415            strategy,
416        })
417    }
418
419    // Statement logging helper methods
420
421    /// Log the beginning of statement execution.
422    pub(crate) fn log_began_execution(
423        &self,
424        record: statement_logging::StatementBeganExecutionRecord,
425        mseh_update: Row,
426        prepared_statement: Option<PreparedStatementEvent>,
427    ) {
428        self.coordinator_client
429            .send(Command::FrontendStatementLogging(
430                FrontendStatementLoggingEvent::BeganExecution {
431                    record,
432                    mseh_update,
433                    prepared_statement,
434                },
435            ));
436    }
437
438    /// Log cluster selection for a statement.
439    pub(crate) fn log_set_cluster(
440        &self,
441        id: StatementLoggingId,
442        cluster_id: mz_controller_types::ClusterId,
443    ) {
444        self.coordinator_client
445            .send(Command::FrontendStatementLogging(
446                FrontendStatementLoggingEvent::SetCluster { id, cluster_id },
447            ));
448    }
449
450    /// Log timestamp determination for a statement.
451    pub(crate) fn log_set_timestamp(&self, id: StatementLoggingId, timestamp: mz_repr::Timestamp) {
452        self.coordinator_client
453            .send(Command::FrontendStatementLogging(
454                FrontendStatementLoggingEvent::SetTimestamp { id, timestamp },
455            ));
456    }
457
458    /// Log transient index ID for a statement.
459    pub(crate) fn log_set_transient_index_id(
460        &self,
461        id: StatementLoggingId,
462        transient_index_id: mz_repr::GlobalId,
463    ) {
464        self.coordinator_client
465            .send(Command::FrontendStatementLogging(
466                FrontendStatementLoggingEvent::SetTransientIndex {
467                    id,
468                    transient_index_id,
469                },
470            ));
471    }
472
473    /// Log a statement lifecycle event.
474    pub(crate) fn log_lifecycle_event(
475        &self,
476        id: StatementLoggingId,
477        event: statement_logging::StatementLifecycleEvent,
478    ) {
479        let when = (self.statement_logging_frontend.now)();
480        self.coordinator_client
481            .send(Command::FrontendStatementLogging(
482                FrontendStatementLoggingEvent::Lifecycle { id, event, when },
483            ));
484    }
485
486    /// Log the end of statement execution.
487    pub(crate) fn log_ended_execution(
488        &self,
489        id: StatementLoggingId,
490        reason: statement_logging::StatementEndedExecutionReason,
491    ) {
492        let ended_at = (self.statement_logging_frontend.now)();
493        let record = statement_logging::StatementEndedExecutionRecord {
494            id: id.0,
495            reason,
496            ended_at,
497        };
498        self.coordinator_client
499            .send(Command::FrontendStatementLogging(
500                FrontendStatementLoggingEvent::EndedExecution(record),
501            ));
502    }
503}