mz_compute_client/controller/
instance_client.rs1use std::sync::Arc;
17
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_dyncfg::ConfigSet;
22use mz_expr::RowSetFinishing;
23use mz_ore::now::NowFn;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_persist_types::PersistLocation;
26use mz_repr::{GlobalId, RelationDesc, Row, Timestamp};
27use mz_storage_types::read_holds::{self, ReadHold};
28use thiserror::Error;
29use timely::progress::{Antichain, ChangeBatch};
30use tokio::sync::mpsc::error::SendError;
31use tokio::sync::{mpsc, oneshot};
32use tracing::debug_span;
33use uuid::Uuid;
34
35use crate::controller::error::CollectionMissing;
36use crate::controller::instance::{Command, Instance, SharedCollectionState};
37use crate::controller::{
38 ComputeControllerResponse, IntrospectionUpdates, ReplicaId, StorageCollections,
39};
40use crate::logging::LogVariant;
41use crate::metrics::InstanceMetrics;
42use crate::protocol::command::PeekTarget;
43use crate::protocol::response::PeekResponse;
44
45#[derive(Error, Debug)]
47#[error("the instance has shut down")]
48pub struct InstanceShutDown;
49
50#[derive(Error, Debug)]
52pub enum PeekError {
53 #[error("replica does not exist: {0}")]
55 ReplicaMissing(ReplicaId),
56 #[error("read hold ID does not match peeked collection: {0}")]
58 ReadHoldIdMismatch(GlobalId),
59 #[error("insufficient read hold provided: {0}")]
61 ReadHoldInsufficient(GlobalId),
62 #[error("the instance has shut down")]
64 InstanceShutDown,
65}
66
67impl From<InstanceShutDown> for PeekError {
68 fn from(_error: InstanceShutDown) -> Self {
69 Self::InstanceShutDown
70 }
71}
72
73#[derive(Error, Debug)]
75pub enum AcquireReadHoldsError {
76 #[error("collection does not exist: {0}")]
78 CollectionMissing(GlobalId),
79 #[error("the instance has shut down")]
81 InstanceShutDown,
82}
83
84impl From<CollectionMissing> for AcquireReadHoldsError {
85 fn from(error: CollectionMissing) -> Self {
86 Self::CollectionMissing(error.0)
87 }
88}
89
90impl From<InstanceShutDown> for AcquireReadHoldsError {
91 fn from(_error: InstanceShutDown) -> Self {
92 Self::InstanceShutDown
93 }
94}
95
96#[derive(Clone, derivative::Derivative)]
98#[derivative(Debug)]
99pub struct InstanceClient {
100 command_tx: mpsc::UnboundedSender<Command>,
102 #[derivative(Debug = "ignore")]
104 read_hold_tx: read_holds::ChangeTx,
105}
106
107impl InstanceClient {
108 pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx {
109 Arc::clone(&self.read_hold_tx)
110 }
111
112 pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
115 where
116 F: FnOnce(&mut Instance) + Send + 'static,
117 {
118 let otel_ctx = OpenTelemetryContext::obtain();
119 self.command_tx
120 .send(Box::new(move |instance| {
121 let _span = debug_span!("instance_client::call").entered();
122 otel_ctx.attach_as_parent();
123
124 f(instance)
125 }))
126 .map_err(|_send_error| InstanceShutDown)
127 }
128
129 pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
132 where
133 F: FnOnce(&mut Instance) -> R + Send + 'static,
134 R: Send + 'static,
135 {
136 let (tx, rx) = oneshot::channel();
137 let otel_ctx = OpenTelemetryContext::obtain();
138 self.command_tx
139 .send(Box::new(move |instance| {
140 let _span = debug_span!("instance_client::call_sync").entered();
141 otel_ctx.attach_as_parent();
142 let result = f(instance);
143 let _ = tx.send(result);
144 }))
145 .map_err(|_send_error| InstanceShutDown)?;
146
147 rx.await.map_err(|_| InstanceShutDown)
148 }
149
150 pub(super) fn spawn(
151 id: ComputeInstanceId,
152 build_info: &'static BuildInfo,
153 storage: StorageCollections,
154 peek_stash_persist_location: PersistLocation,
155 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState)>,
156 metrics: InstanceMetrics,
157 now: NowFn,
158 wallclock_lag: WallclockLagFn<Timestamp>,
159 dyncfg: Arc<ConfigSet>,
160 response_tx: mpsc::UnboundedSender<ComputeControllerResponse>,
161 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
162 read_only: bool,
163 ) -> Self {
164 let (command_tx, command_rx) = mpsc::unbounded_channel();
165
166 let read_hold_tx: read_holds::ChangeTx = {
167 let command_tx = command_tx.clone();
168 Arc::new(move |id, change: ChangeBatch<_>| {
169 let cmd: Command = {
170 let change = change.clone();
171 Box::new(move |i| i.apply_read_hold_change(id, change))
172 };
173 command_tx.send(cmd).map_err(|_| SendError((id, change)))
174 })
175 };
176
177 mz_ore::task::spawn(
178 || format!("compute-instance-{id}"),
179 Instance::new(
180 build_info,
181 storage,
182 peek_stash_persist_location,
183 arranged_logs,
184 metrics,
185 now,
186 wallclock_lag,
187 dyncfg,
188 command_rx,
189 response_tx,
190 Arc::clone(&read_hold_tx),
191 introspection_tx,
192 read_only,
193 )
194 .run(),
195 );
196
197 Self {
198 command_tx,
199 read_hold_tx,
200 }
201 }
202
203 pub async fn acquire_read_holds_and_collection_write_frontiers(
206 &self,
207 ids: Vec<GlobalId>,
208 ) -> Result<Vec<(GlobalId, ReadHold, Antichain<Timestamp>)>, AcquireReadHoldsError> {
209 self.call_sync(move |i| {
210 let mut result = Vec::new();
211 for id in ids.into_iter() {
212 result.push((
213 id,
214 i.acquire_read_hold(id)?,
215 i.collection_write_frontier(id)?,
216 ));
217 }
218 Ok(result)
219 })
220 .await?
221 }
222
223 pub async fn peek(
227 &self,
228 peek_target: PeekTarget,
229 literal_constraints: Option<Vec<Row>>,
230 uuid: Uuid,
231 timestamp: Timestamp,
232 result_desc: RelationDesc,
233 finishing: RowSetFinishing,
234 map_filter_project: mz_expr::SafeMfpPlan,
235 target_read_hold: ReadHold,
236 target_replica: Option<ReplicaId>,
237 peek_response_tx: oneshot::Sender<PeekResponse>,
238 ) -> Result<(), PeekError> {
239 self.call_sync(move |i| {
240 i.peek(
241 peek_target,
242 literal_constraints,
243 uuid,
244 timestamp,
245 result_desc,
246 finishing,
247 map_filter_project,
248 target_read_hold,
249 target_replica,
250 peek_response_tx,
251 )
252 })
253 .await?
254 }
255}