mz_compute_client/controller/
instance_client.rs1use std::sync::Arc;
17
18use mz_build_info::BuildInfo;
19use mz_cluster_client::WallclockLagFn;
20use mz_compute_types::ComputeInstanceId;
21use mz_dyncfg::ConfigSet;
22use mz_expr::RowSetFinishing;
23use mz_ore::now::NowFn;
24use mz_ore::tracing::OpenTelemetryContext;
25use mz_persist_types::PersistLocation;
26use mz_repr::{GlobalId, RelationDesc, Row};
27use mz_storage_types::read_holds::{self, ReadHold};
28use thiserror::Error;
29use timely::progress::{Antichain, ChangeBatch};
30use tokio::sync::mpsc::error::SendError;
31use tokio::sync::{mpsc, oneshot};
32use tracing::debug_span;
33use uuid::Uuid;
34
35use crate::controller::error::CollectionMissing;
36use crate::controller::instance::{Command, Instance, SharedCollectionState};
37use crate::controller::{
38 ComputeControllerResponse, ComputeControllerTimestamp, IntrospectionUpdates, ReplicaId,
39 StorageCollections,
40};
41use crate::logging::LogVariant;
42use crate::metrics::InstanceMetrics;
43use crate::protocol::command::PeekTarget;
44use crate::protocol::response::PeekResponse;
45
46#[derive(Error, Debug)]
48#[error("the instance has shut down")]
49pub struct InstanceShutDown;
50
51#[derive(Error, Debug)]
53pub enum PeekError {
54 #[error("replica does not exist: {0}")]
56 ReplicaMissing(ReplicaId),
57 #[error("read hold ID does not match peeked collection: {0}")]
59 ReadHoldIdMismatch(GlobalId),
60 #[error("insufficient read hold provided: {0}")]
62 ReadHoldInsufficient(GlobalId),
63 #[error("the instance has shut down")]
65 InstanceShutDown,
66}
67
68impl From<InstanceShutDown> for PeekError {
69 fn from(_error: InstanceShutDown) -> Self {
70 Self::InstanceShutDown
71 }
72}
73
74#[derive(Error, Debug)]
76pub enum AcquireReadHoldsError {
77 #[error("collection does not exist: {0}")]
79 CollectionMissing(GlobalId),
80 #[error("the instance has shut down")]
82 InstanceShutDown,
83}
84
85impl From<CollectionMissing> for AcquireReadHoldsError {
86 fn from(error: CollectionMissing) -> Self {
87 Self::CollectionMissing(error.0)
88 }
89}
90
91impl From<InstanceShutDown> for AcquireReadHoldsError {
92 fn from(_error: InstanceShutDown) -> Self {
93 Self::InstanceShutDown
94 }
95}
96
97#[derive(Clone, derivative::Derivative)]
99#[derivative(Debug)]
100pub struct InstanceClient<T: ComputeControllerTimestamp> {
101 command_tx: mpsc::UnboundedSender<Command<T>>,
103 #[derivative(Debug = "ignore")]
105 read_hold_tx: read_holds::ChangeTx<T>,
106}
107
108impl<T: ComputeControllerTimestamp> InstanceClient<T> {
109 pub(super) fn read_hold_tx(&self) -> read_holds::ChangeTx<T> {
110 Arc::clone(&self.read_hold_tx)
111 }
112
113 pub(super) fn call<F>(&self, f: F) -> Result<(), InstanceShutDown>
116 where
117 F: FnOnce(&mut Instance<T>) + Send + 'static,
118 {
119 let otel_ctx = OpenTelemetryContext::obtain();
120 self.command_tx
121 .send(Box::new(move |instance| {
122 let _span = debug_span!("instance_client::call").entered();
123 otel_ctx.attach_as_parent();
124
125 f(instance)
126 }))
127 .map_err(|_send_error| InstanceShutDown)
128 }
129
130 pub(super) async fn call_sync<F, R>(&self, f: F) -> Result<R, InstanceShutDown>
133 where
134 F: FnOnce(&mut Instance<T>) -> R + Send + 'static,
135 R: Send + 'static,
136 {
137 let (tx, rx) = oneshot::channel();
138 let otel_ctx = OpenTelemetryContext::obtain();
139 self.command_tx
140 .send(Box::new(move |instance| {
141 let _span = debug_span!("instance_client::call_sync").entered();
142 otel_ctx.attach_as_parent();
143 let result = f(instance);
144 let _ = tx.send(result);
145 }))
146 .map_err(|_send_error| InstanceShutDown)?;
147
148 rx.await.map_err(|_| InstanceShutDown)
149 }
150
151 pub(super) fn spawn(
152 id: ComputeInstanceId,
153 build_info: &'static BuildInfo,
154 storage: StorageCollections<T>,
155 peek_stash_persist_location: PersistLocation,
156 arranged_logs: Vec<(LogVariant, GlobalId, SharedCollectionState<T>)>,
157 metrics: InstanceMetrics,
158 now: NowFn,
159 wallclock_lag: WallclockLagFn<T>,
160 dyncfg: Arc<ConfigSet>,
161 response_tx: mpsc::UnboundedSender<ComputeControllerResponse<T>>,
162 introspection_tx: mpsc::UnboundedSender<IntrospectionUpdates>,
163 read_only: bool,
164 ) -> Self {
165 let (command_tx, command_rx) = mpsc::unbounded_channel();
166
167 let read_hold_tx: read_holds::ChangeTx<_> = {
168 let command_tx = command_tx.clone();
169 Arc::new(move |id, change: ChangeBatch<_>| {
170 let cmd: Command<_> = {
171 let change = change.clone();
172 Box::new(move |i| i.apply_read_hold_change(id, change.clone()))
173 };
174 command_tx.send(cmd).map_err(|_| SendError((id, change)))
175 })
176 };
177
178 mz_ore::task::spawn(
179 || format!("compute-instance-{id}"),
180 Instance::new(
181 build_info,
182 storage,
183 peek_stash_persist_location,
184 arranged_logs,
185 metrics,
186 now,
187 wallclock_lag,
188 dyncfg,
189 command_rx,
190 response_tx,
191 Arc::clone(&read_hold_tx),
192 introspection_tx,
193 read_only,
194 )
195 .run(),
196 );
197
198 Self {
199 command_tx,
200 read_hold_tx,
201 }
202 }
203
204 pub async fn acquire_read_holds_and_collection_write_frontiers(
207 &self,
208 ids: Vec<GlobalId>,
209 ) -> Result<Vec<(GlobalId, ReadHold<T>, Antichain<T>)>, AcquireReadHoldsError> {
210 self.call_sync(move |i| {
211 let mut result = Vec::new();
212 for id in ids.into_iter() {
213 result.push((
214 id,
215 i.acquire_read_hold(id)?,
216 i.collection_write_frontier(id)?,
217 ));
218 }
219 Ok(result)
220 })
221 .await?
222 }
223
224 pub async fn peek(
228 &self,
229 peek_target: PeekTarget,
230 literal_constraints: Option<Vec<Row>>,
231 uuid: Uuid,
232 timestamp: T,
233 result_desc: RelationDesc,
234 finishing: RowSetFinishing,
235 map_filter_project: mz_expr::SafeMfpPlan,
236 target_read_hold: ReadHold<T>,
237 target_replica: Option<ReplicaId>,
238 peek_response_tx: oneshot::Sender<PeekResponse>,
239 ) -> Result<(), PeekError> {
240 self.call_sync(move |i| {
241 i.peek(
242 peek_target,
243 literal_constraints,
244 uuid,
245 timestamp,
246 result_desc,
247 finishing,
248 map_filter_project,
249 target_read_hold,
250 target_replica,
251 peek_response_tx,
252 )
253 })
254 .await?
255 }
256}