Skip to main content

mz_compute_client/controller/
instance_client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A client for communicating with a compute instance task.
11//!
12//! This module provides the public interface for external callers (like the adapter)
13//! to interact with compute instances directly for operations like peek sequencing.
14//! It also has a number of `pub(super)` functions, which are for the `ComputeController`'s benefit.
15
16use std::sync::Arc;
17
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_dyncfg::ConfigSet;
22use mz_expr::RowSetFinishing;
23use mz_ore::now::NowFn;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_persist_types::PersistLocation;
26use mz_repr::{GlobalId, RelationDesc, Row};
27use mz_storage_types::read_holds::{self, ReadHold};
28use thiserror::Error;
29use timely::progress::{Antichain, ChangeBatch};
30use tokio::sync::mpsc::error::SendError;
31use tokio::sync::{mpsc, oneshot};
32use tracing::debug_span;
33use uuid::Uuid;
34
35use crate::controller::error::CollectionMissing;
36use crate::controller::instance::{Command, Instance, SharedCollectionState};
37use crate::controller::{
38    ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, ReplicaId,
39    StorageCollections,
40};
41use crate::logging::LogVariant;
42use crate::metrics::InstanceMetrics;
43use crate::protocol::command::PeekTarget;
44use crate::protocol::response::PeekResponse;
45
46/// Error indicating the instance has shut down.
47#[derive(Error, Debug)]
48#[error("the instance has shut down")]
49pub struct InstanceShutDown;
50
51/// Errors arising during peek processing.
52#[derive(Error, Debug)]
53pub enum PeekError {
54    /// The replica that the peek was issued against does not exist.
55    #[error("replica does not exist: {0}")]
56    ReplicaMissing(ReplicaId),
57    /// The read hold that was passed in is against the wrong collection.
58    #[error("read hold ID does not match peeked collection: {0}")]
59    ReadHoldIdMismatch(GlobalId),
60    /// The read hold that was passed in is for a later time than the peek's timestamp.
61    #[error("insufficient read hold provided: {0}")]
62    ReadHoldInsufficient(GlobalId),
63    /// The peek's target instance has shut down.
64    #[error("the instance has shut down")]
65    InstanceShutDown,
66}
67
68impl From<InstanceShutDown> for PeekError {
69    fn from(_error: InstanceShutDown) -> Self {
70        Self::InstanceShutDown
71    }
72}
73
74/// Errors arising from [`InstanceClient::acquire_read_holds_and_collection_write_frontiers`].
75#[derive(Error, Debug)]
76pub enum AcquireReadHoldsError {
77    /// The compute collection does not exist.
78    #[error("collection does not exist: {0}")]
79    CollectionMissing(GlobalId),
80    /// The instance has shut down.
81    #[error("the instance has shut down")]
82    InstanceShutDown,
83}
84
85impl From<CollectionMissing> for AcquireReadHoldsError {
86    fn from(error: CollectionMissing) -> Self {
87        Self::CollectionMissing(error.0)
88    }
89}
90
91impl From<InstanceShutDown> for AcquireReadHoldsError {
92    fn from(_error: InstanceShutDown) -> Self {
93        Self::InstanceShutDown
94    }
95}
96
97/// A client for an `Instance` task.
98#[derive(Clone, derivative::Derivative)]
99#[derivative(Debug)]
100pub struct InstanceClient<T: ComputeControllerTimestamp> {
101    /// A sender for commands for the instance.
102    command_tx: mpsc::UnboundedSender<Command<T>>,
103    /// A sender for read hold changes for collections installed on the instance.
104    #[derivative(Debug = "ignore")]
105    read_hold_tx: read_holds::ChangeTx<T>,
106}
107
108impl<T: ComputeControllerTimestamp> InstanceClient<T> {
109    pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
110        Arc::clone(&self.read_hold_tx)
111    }
112
113    /// Call a method to be run on the instance task, by sending a message to the instance.
114    /// Does not wait for a response message.
115    pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
116    where
117        F: FnOnce(&mut Instance<T>) + Send + 'static,
118    {
119        let otel_ctx = OpenTelemetryContext::obtain();
120        self.command_tx
121            .send(Box::new(move |instance| {
122                let _span = debug_span!("instance_client::call").entered();
123                otel_ctx.attach_as_parent();
124
125                f(instance)
126            }))
127            .map_err(|_send_error| InstanceShutDown)
128    }
129
130    /// Call a method to be run on the instance task, by sending a message to the instance and
131    /// waiting for a response message.
132    pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
133    where
134        F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
135        R: Send + 'static,
136    {
137        let (tx, rx) = oneshot::channel();
138        let otel_ctx = OpenTelemetryContext::obtain();
139        self.command_tx
140            .send(Box::new(move |instance| {
141                let _span = debug_span!("instance_client::call_sync").entered();
142                otel_ctx.attach_as_parent();
143                let result = f(instance);
144                let _ = tx.send(result);
145            }))
146            .map_err(|_send_error| InstanceShutDown)?;
147
148        rx.await.map_err(|_| InstanceShutDown)
149    }
150
151    pub(super) fn spawn(
152        id: ComputeInstanceId,
153        build_info: &'static BuildInfo,
154        storage: StorageCollections<T>,
155        peek_stash_persist_location: PersistLocation,
156        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
157        metrics: InstanceMetrics,
158        now: NowFn,
159        wallclock_lag: WallclockLagFn<T>,
160        dyncfg: Arc<ConfigSet>,
161        response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
162        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
163        read_only: bool,
164    ) -> Self {
165        let (command_tx, command_rx) = mpsc::unbounded_channel();
166
167        let read_hold_tx: read_holds::ChangeTx<_> = {
168            let command_tx = command_tx.clone();
169            Arc::new(move |id, change: ChangeBatch<_>| {
170                let cmd: Command<_> = {
171                    let change = change.clone();
172                    Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
173                };
174                command_tx.send(cmd).map_err(|_| SendError((id, change)))
175            })
176        };
177
178        mz_ore::task::spawn(
179            || format!("compute-instance-{id}"),
180            Instance::new(
181                build_info,
182                storage,
183                peek_stash_persist_location,
184                arranged_logs,
185                metrics,
186                now,
187                wallclock_lag,
188                dyncfg,
189                command_rx,
190                response_tx,
191                Arc::clone(&read_hold_tx),
192                introspection_tx,
193                read_only,
194            )
195            .run(),
196        );
197
198        Self {
199            command_tx,
200            read_hold_tx,
201        }
202    }
203
204    /// Acquires a `ReadHold` and collection write frontier for each of the identified compute
205    /// collections.
206    pub async fn acquire_read_holds_and_collection_write_frontiers(
207        &self,
208        ids: Vec<GlobalId>,
209    ) -> Result<Vec<(GlobalId, ReadHold<T>, Antichain<T>)>, AcquireReadHoldsError> {
210        self.call_sync(move |i| {
211            let mut result = Vec::new();
212            for id in ids.into_iter() {
213                result.push((
214                    id,
215                    i.acquire_read_hold(id)?,
216                    i.collection_write_frontier(id)?,
217                ));
218            }
219            Ok(result)
220        })
221        .await?
222    }
223
224    /// Issue a peek by calling into the instance task.
225    ///
226    /// If this returns an error, then it didn't modify any `Instance` state.
227    pub async fn peek(
228        &self,
229        peek_target: PeekTarget,
230        literal_constraints: Option<Vec<Row>>,
231        uuid: Uuid,
232        timestamp: T,
233        result_desc: RelationDesc,
234        finishing: RowSetFinishing,
235        map_filter_project: mz_expr::SafeMfpPlan,
236        target_read_hold: ReadHold<T>,
237        target_replica: Option<ReplicaId>,
238        peek_response_tx: oneshot::Sender<PeekResponse>,
239    ) -> Result<(), PeekError> {
240        self.call_sync(move |i| {
241            i.peek(
242                peek_target,
243                literal_constraints,
244                uuid,
245                timestamp,
246                result_desc,
247                finishing,
248                map_filter_project,
249                target_read_hold,
250                target_replica,
251                peek_response_tx,
252            )
253        })
254        .await?
255    }
256}