Skip to main content

mz_compute_client/controller/
instance_client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A client for communicating with a compute instance task.
11//!
12//! This module provides the public interface for external callers (like the adapter)
13//! to interact with compute instances directly for operations like peek sequencing.
14//! It also has a number of `pub(super)` functions, which are for the `ComputeController`'s benefit.
15
16use std::sync::Arc;
17
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_dyncfg::ConfigSet;
22use mz_expr::RowSetFinishing;
23use mz_ore::now::NowFn;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_persist_types::PersistLocation;
26use mz_repr::{GlobalId, RelationDesc, Row, Timestamp};
27use mz_storage_types::read_holds::{self, ReadHold};
28use thiserror::Error;
29use timely::progress::{Antichain, ChangeBatch};
30use tokio::sync::mpsc::error::SendError;
31use tokio::sync::{mpsc, oneshot};
32use tracing::debug_span;
33use uuid::Uuid;
34
35use crate::controller::error::CollectionMissing;
36use crate::controller::instance::{Command, Instance, SharedCollectionState};
37use crate::controller::{
38    ComputeControllerResponse, IntrospectionUpdates, ReplicaId, StorageCollections,
39};
40use crate::logging::LogVariant;
41use crate::metrics::InstanceMetrics;
42use crate::protocol::command::PeekTarget;
43use crate::protocol::response::PeekResponse;
44
45/// Error indicating the instance has shut down.
46#[derive(Error, Debug)]
47#[error("the instance has shut down")]
48pub struct InstanceShutDown;
49
50/// Errors arising during peek processing.
51#[derive(Error, Debug)]
52pub enum PeekError {
53    /// The replica that the peek was issued against does not exist.
54    #[error("replica does not exist: {0}")]
55    ReplicaMissing(ReplicaId),
56    /// The read hold that was passed in is against the wrong collection.
57    #[error("read hold ID does not match peeked collection: {0}")]
58    ReadHoldIdMismatch(GlobalId),
59    /// The read hold that was passed in is for a later time than the peek's timestamp.
60    #[error("insufficient read hold provided: {0}")]
61    ReadHoldInsufficient(GlobalId),
62    /// The peek's target instance has shut down.
63    #[error("the instance has shut down")]
64    InstanceShutDown,
65}
66
67impl From<InstanceShutDown> for PeekError {
68    fn from(_error: InstanceShutDown) -> Self {
69        Self::InstanceShutDown
70    }
71}
72
73/// Errors arising from [`InstanceClient::acquire_read_holds_and_collection_write_frontiers`].
74#[derive(Error, Debug)]
75pub enum AcquireReadHoldsError {
76    /// The compute collection does not exist.
77    #[error("collection does not exist: {0}")]
78    CollectionMissing(GlobalId),
79    /// The instance has shut down.
80    #[error("the instance has shut down")]
81    InstanceShutDown,
82}
83
84impl From<CollectionMissing> for AcquireReadHoldsError {
85    fn from(error: CollectionMissing) -> Self {
86        Self::CollectionMissing(error.0)
87    }
88}
89
90impl From<InstanceShutDown> for AcquireReadHoldsError {
91    fn from(_error: InstanceShutDown) -> Self {
92        Self::InstanceShutDown
93    }
94}
95
96/// A client for an `Instance` task.
97#[derive(Clone, derivative::Derivative)]
98#[derivative(Debug)]
99pub struct InstanceClient {
100    /// A sender for commands for the instance.
101    command_tx: mpsc::UnboundedSender<Command>,
102    /// A sender for read hold changes for collections installed on the instance.
103    #[derivative(Debug = "ignore")]
104    read_hold_tx: read_holds::ChangeTx,
105}
106
107impl InstanceClient {
108    pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx {
109        Arc::clone(&self.read_hold_tx)
110    }
111
112    /// Call a method to be run on the instance task, by sending a message to the instance.
113    /// Does not wait for a response message.
114    pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
115    where
116        F: FnOnce(&mut Instance) + Send + 'static,
117    {
118        let otel_ctx = OpenTelemetryContext::obtain();
119        self.command_tx
120            .send(Box::new(move |instance| {
121                let _span = debug_span!("instance_client::call").entered();
122                otel_ctx.attach_as_parent();
123
124                f(instance)
125            }))
126            .map_err(|_send_error| InstanceShutDown)
127    }
128
129    /// Call a method to be run on the instance task, by sending a message to the instance and
130    /// waiting for a response message.
131    pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
132    where
133        F: FnOnce(&mut Instance) -> R + Send + 'static,
134        R: Send + 'static,
135    {
136        let (tx, rx) = oneshot::channel();
137        let otel_ctx = OpenTelemetryContext::obtain();
138        self.command_tx
139            .send(Box::new(move |instance| {
140                let _span = debug_span!("instance_client::call_sync").entered();
141                otel_ctx.attach_as_parent();
142                let result = f(instance);
143                let _ = tx.send(result);
144            }))
145            .map_err(|_send_error| InstanceShutDown)?;
146
147        rx.await.map_err(|_| InstanceShutDown)
148    }
149
150    pub(super) fn spawn(
151        id: ComputeInstanceId,
152        build_info: &'static BuildInfo,
153        storage: StorageCollections,
154        peek_stash_persist_location: PersistLocation,
155        arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState)>,
156        metrics: InstanceMetrics,
157        now: NowFn,
158        wallclock_lag: WallclockLagFn<Timestamp>,
159        dyncfg: Arc<ConfigSet>,
160        response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
161        introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
162        read_only: bool,
163    ) -> Self {
164        let (command_tx, command_rx) = mpsc::unbounded_channel();
165
166        let read_hold_tx: read_holds::ChangeTx = {
167            let command_tx = command_tx.clone();
168            Arc::new(move |id, change: ChangeBatch<_>| {
169                let cmd: Command = {
170                    let change = change.clone();
171                    Box::new(move |i| i.apply_read_hold_change(id, change))
172                };
173                command_tx.send(cmd).map_err(|_| SendError((id, change)))
174            })
175        };
176
177        mz_ore::task::spawn(
178            || format!("compute-instance-{id}"),
179            Instance::new(
180                build_info,
181                storage,
182                peek_stash_persist_location,
183                arranged_logs,
184                metrics,
185                now,
186                wallclock_lag,
187                dyncfg,
188                command_rx,
189                response_tx,
190                Arc::clone(&read_hold_tx),
191                introspection_tx,
192                read_only,
193            )
194            .run(),
195        );
196
197        Self {
198            command_tx,
199            read_hold_tx,
200        }
201    }
202
203    /// Acquires a `ReadHold` and collection write frontier for each of the identified compute
204    /// collections.
205    pub async fn acquire_read_holds_and_collection_write_frontiers(
206        &self,
207        ids: Vec<GlobalId>,
208    ) -> Result<Vec<(GlobalId, ReadHold, Antichain<Timestamp>)>, AcquireReadHoldsError> {
209        self.call_sync(move |i| {
210            let mut result = Vec::new();
211            for id in ids.into_iter() {
212                result.push((
213                    id,
214                    i.acquire_read_hold(id)?,
215                    i.collection_write_frontier(id)?,
216                ));
217            }
218            Ok(result)
219        })
220        .await?
221    }
222
223    /// Issue a peek by calling into the instance task.
224    ///
225    /// If this returns an error, then it didn't modify any `Instance` state.
226    pub async fn peek(
227        &self,
228        peek_target: PeekTarget,
229        literal_constraints: Option<Vec<Row>>,
230        uuid: Uuid,
231        timestamp: Timestamp,
232        result_desc: RelationDesc,
233        finishing: RowSetFinishing,
234        map_filter_project: mz_expr::SafeMfpPlan,
235        target_read_hold: ReadHold,
236        target_replica: Option<ReplicaId>,
237        peek_response_tx: oneshot::Sender<PeekResponse>,
238    ) -> Result<(), PeekError> {
239        self.call_sync(move |i| {
240            i.peek(
241                peek_target,
242                literal_constraints,
243                uuid,
244                timestamp,
245                result_desc,
246                finishing,
247                map_filter_project,
248                target_read_hold,
249                target_replica,
250                peek_response_tx,
251            )
252        })
253        .await?
254    }
255}