1use std::fmt::Debug;
11
12use itertools::Itertools;
13use mz_catalog::durable::{DurableCatalogError, FenceError};
14use mz_compute_client::controller::error::{
15 CollectionUpdateError, DataflowCreationError, InstanceMissing, PeekError, ReadPolicyError,
16};
17use mz_controller_types::ClusterId;
18use mz_ore::tracing::OpenTelemetryContext;
19use mz_ore::{exit, soft_assert_no_log};
20use mz_repr::{RelationDesc, RowIterator, ScalarType};
21use mz_sql::names::FullItemName;
22use mz_sql::plan::StatementDesc;
23use mz_sql::session::metadata::SessionMetadata;
24use mz_sql::session::vars::Var;
25use mz_sql_parser::ast::display::AstDisplay;
26use mz_sql_parser::ast::{
27 CreateIndexStatement, Ident, Raw, RawClusterName, RawItemName, Statement,
28};
29use mz_storage_types::controller::StorageError;
30use mz_transform::TransformError;
31use tokio::sync::mpsc::UnboundedSender;
32use tokio::sync::oneshot;
33
34use crate::catalog::{Catalog, CatalogState};
35use crate::command::{Command, Response};
36use crate::coord::{Message, PendingTxnResponse};
37use crate::error::AdapterError;
38use crate::session::{EndTransactionAction, Session};
39use crate::{ExecuteContext, ExecuteResponse};
40
41#[derive(Debug)]
43pub struct ClientTransmitter<T>
44where
45 T: Transmittable,
46 <T as Transmittable>::Allowed: 'static,
47{
48 tx: Option<oneshot::Sender<Response<T>>>,
49 internal_cmd_tx: UnboundedSender<Message>,
50 allowed: Option<&'static [T::Allowed]>,
53}
54
55impl<T: Transmittable + std::fmt::Debug> ClientTransmitter<T> {
56 pub fn new(
58 tx: oneshot::Sender<Response<T>>,
59 internal_cmd_tx: UnboundedSender<Message>,
60 ) -> ClientTransmitter<T> {
61 ClientTransmitter {
62 tx: Some(tx),
63 internal_cmd_tx,
64 allowed: None,
65 }
66 }
67
68 #[mz_ore::instrument(level = "debug")]
75 pub fn send(mut self, result: Result<T, AdapterError>, session: Session) {
76 soft_assert_no_log!(
78 match (&result, self.allowed.take()) {
79 (Ok(t), Some(allowed)) => allowed.contains(&t.to_allowed()),
80 _ => true,
81 },
82 "tried to send disallowed value {result:?} through ClientTransmitter; \
83 see ClientTransmitter::set_allowed"
84 );
85
86 if let Err(res) = self
89 .tx
90 .take()
91 .expect("tx will always be `Some` unless `self` has been consumed")
92 .send(Response {
93 result,
94 session,
95 otel_ctx: OpenTelemetryContext::obtain(),
96 })
97 {
98 self.internal_cmd_tx
99 .send(Message::Command(
100 OpenTelemetryContext::obtain(),
101 Command::Terminate {
102 conn_id: res.session.conn_id().clone(),
103 tx: None,
104 },
105 ))
106 .expect("coordinator unexpectedly gone");
107 }
108 }
109
110 pub fn take(mut self) -> oneshot::Sender<Response<T>> {
111 self.tx
112 .take()
113 .expect("tx will always be `Some` unless `self` has been consumed")
114 }
115
116 pub fn set_allowed(&mut self, allowed: &'static [T::Allowed]) {
120 self.allowed = Some(allowed);
121 }
122}
123
124pub trait Transmittable {
126 type Allowed: Eq + PartialEq + std::fmt::Debug;
128 fn to_allowed(&self) -> Self::Allowed;
138}
139
140impl Transmittable for () {
141 type Allowed = bool;
142
143 fn to_allowed(&self) -> Self::Allowed {
144 true
145 }
146}
147
148#[derive(Debug)]
150pub struct CompletedClientTransmitter {
151 ctx: ExecuteContext,
152 response: Result<PendingTxnResponse, AdapterError>,
153 action: EndTransactionAction,
154}
155
156impl CompletedClientTransmitter {
157 pub fn new(
159 ctx: ExecuteContext,
160 response: Result<PendingTxnResponse, AdapterError>,
161 action: EndTransactionAction,
162 ) -> Self {
163 CompletedClientTransmitter {
164 ctx,
165 response,
166 action,
167 }
168 }
169
170 pub fn finalize(mut self) -> (ExecuteContext, Result<ExecuteResponse, AdapterError>) {
172 let changed = self
173 .ctx
174 .session_mut()
175 .vars_mut()
176 .end_transaction(self.action);
177
178 let response = self.response.map(|mut r| {
180 r.extend_params(changed);
181 ExecuteResponse::from(r)
182 });
183
184 (self.ctx, response)
185 }
186}
187
188impl<T: Transmittable> Drop for ClientTransmitter<T> {
189 fn drop(&mut self) {
190 if self.tx.is_some() {
191 panic!("client transmitter dropped without send")
192 }
193 }
194}
195
196pub fn index_sql(
199 index_name: String,
200 cluster_id: ClusterId,
201 view_name: FullItemName,
202 view_desc: &RelationDesc,
203 keys: &[usize],
204) -> String {
205 use mz_sql::ast::{Expr, Value};
206
207 CreateIndexStatement::<Raw> {
208 name: Some(Ident::new_unchecked(index_name)),
209 on_name: RawItemName::Name(mz_sql::normalize::unresolve(view_name)),
210 in_cluster: Some(RawClusterName::Resolved(cluster_id.to_string())),
211 key_parts: Some(
212 keys.iter()
213 .map(|i| match view_desc.get_unambiguous_name(*i) {
214 Some(n) => Expr::Identifier(vec![Ident::new_unchecked(n.to_string())]),
215 _ => Expr::Value(Value::Number((i + 1).to_string())),
216 })
217 .collect(),
218 ),
219 with_options: vec![],
220 if_not_exists: false,
221 }
222 .to_ast_string_stable()
223}
224
225pub fn describe(
227 catalog: &Catalog,
228 stmt: Statement<Raw>,
229 param_types: &[Option<ScalarType>],
230 session: &Session,
231) -> Result<StatementDesc, AdapterError> {
232 let catalog = &catalog.for_session(session);
233 let (stmt, _) = mz_sql::names::resolve(catalog, stmt)?;
234 Ok(mz_sql::plan::describe(
235 session.pcx(),
236 catalog,
237 stmt,
238 param_types,
239 )?)
240}
241
242pub trait ResultExt<T> {
243 fn unwrap_or_terminate(self, context: &str) -> T;
247
248 fn maybe_terminate(self, context: &str) -> Self;
252}
253
254impl<T, E> ResultExt<T> for Result<T, E>
255where
256 E: ShouldTerminateGracefully + Debug,
257{
258 fn unwrap_or_terminate(self, context: &str) -> T {
259 match self {
260 Ok(t) => t,
261 Err(e) if e.should_terminate_gracefully() => exit!(0, "{context}: {e:?}"),
262 Err(e) => panic!("{context}: {e:?}"),
263 }
264 }
265
266 fn maybe_terminate(self, context: &str) -> Self {
267 if let Err(e) = &self {
268 if e.should_terminate_gracefully() {
269 exit!(0, "{context}: {e:?}");
270 }
271 }
272
273 self
274 }
275}
276
277trait ShouldTerminateGracefully {
280 fn should_terminate_gracefully(&self) -> bool;
283}
284
285impl ShouldTerminateGracefully for AdapterError {
286 fn should_terminate_gracefully(&self) -> bool {
287 match self {
288 AdapterError::Catalog(e) => e.should_terminate_gracefully(),
289 _ => false,
290 }
291 }
292}
293
294impl ShouldTerminateGracefully for mz_catalog::memory::error::Error {
295 fn should_terminate_gracefully(&self) -> bool {
296 match &self.kind {
297 mz_catalog::memory::error::ErrorKind::Durable(e) => e.should_terminate_gracefully(),
298 _ => false,
299 }
300 }
301}
302
303impl ShouldTerminateGracefully for mz_catalog::durable::CatalogError {
304 fn should_terminate_gracefully(&self) -> bool {
305 match &self {
306 Self::Durable(e) => e.should_terminate_gracefully(),
307 _ => false,
308 }
309 }
310}
311
312impl ShouldTerminateGracefully for DurableCatalogError {
313 fn should_terminate_gracefully(&self) -> bool {
314 match self {
315 DurableCatalogError::Fence(err) => err.should_terminate_gracefully(),
316 DurableCatalogError::IncompatibleDataVersion { .. }
317 | DurableCatalogError::IncompatiblePersistVersion { .. }
318 | DurableCatalogError::Proto(_)
319 | DurableCatalogError::Uninitialized
320 | DurableCatalogError::NotWritable(_)
321 | DurableCatalogError::DuplicateKey
322 | DurableCatalogError::UniquenessViolation
323 | DurableCatalogError::Storage(_)
324 | DurableCatalogError::Internal(_) => false,
325 }
326 }
327}
328
329impl ShouldTerminateGracefully for FenceError {
330 fn should_terminate_gracefully(&self) -> bool {
331 match self {
332 FenceError::DeployGeneration { .. } => true,
333 FenceError::Epoch { .. } | FenceError::MigrationUpper { .. } => false,
334 }
335 }
336}
337
338impl<T> ShouldTerminateGracefully for StorageError<T> {
339 fn should_terminate_gracefully(&self) -> bool {
340 match self {
341 StorageError::ResourceExhausted(_)
342 | StorageError::CollectionMetadataAlreadyExists(_)
343 | StorageError::PersistShardAlreadyInUse(_)
344 | StorageError::PersistSchemaEvolveRace { .. }
345 | StorageError::PersistInvalidSchemaEvolve { .. }
346 | StorageError::TxnWalShardAlreadyExists
347 | StorageError::UpdateBeyondUpper(_)
348 | StorageError::ReadBeforeSince(_)
349 | StorageError::InvalidUppers(_)
350 | StorageError::InvalidUsage(_)
351 | StorageError::CollectionIdReused(_)
352 | StorageError::SinkIdReused(_)
353 | StorageError::IdentifierMissing(_)
354 | StorageError::IdentifierInvalid(_)
355 | StorageError::IngestionInstanceMissing { .. }
356 | StorageError::ExportInstanceMissing { .. }
357 | StorageError::Generic(_)
358 | StorageError::ReadOnly
359 | StorageError::DataflowError(_)
360 | StorageError::InvalidAlter { .. }
361 | StorageError::ShuttingDown(_)
362 | StorageError::MissingSubsourceReference { .. }
363 | StorageError::RtrTimeout(_)
364 | StorageError::RtrDropFailure(_) => false,
365 }
366 }
367}
368
369impl ShouldTerminateGracefully for DataflowCreationError {
370 fn should_terminate_gracefully(&self) -> bool {
371 match self {
372 DataflowCreationError::SinceViolation(_)
373 | DataflowCreationError::InstanceMissing(_)
374 | DataflowCreationError::CollectionMissing(_)
375 | DataflowCreationError::ReplicaMissing(_)
376 | DataflowCreationError::MissingAsOf
377 | DataflowCreationError::EmptyAsOfForSubscribe
378 | DataflowCreationError::EmptyAsOfForCopyTo => false,
379 }
380 }
381}
382
383impl ShouldTerminateGracefully for CollectionUpdateError {
384 fn should_terminate_gracefully(&self) -> bool {
385 match self {
386 CollectionUpdateError::InstanceMissing(_)
387 | CollectionUpdateError::CollectionMissing(_) => false,
388 }
389 }
390}
391
392impl ShouldTerminateGracefully for PeekError {
393 fn should_terminate_gracefully(&self) -> bool {
394 match self {
395 PeekError::SinceViolation(_)
396 | PeekError::InstanceMissing(_)
397 | PeekError::CollectionMissing(_)
398 | PeekError::ReplicaMissing(_) => false,
399 }
400 }
401}
402
403impl ShouldTerminateGracefully for ReadPolicyError {
404 fn should_terminate_gracefully(&self) -> bool {
405 match self {
406 ReadPolicyError::InstanceMissing(_)
407 | ReadPolicyError::CollectionMissing(_)
408 | ReadPolicyError::WriteOnlyCollection(_) => false,
409 }
410 }
411}
412
413impl ShouldTerminateGracefully for TransformError {
414 fn should_terminate_gracefully(&self) -> bool {
415 match self {
416 TransformError::Internal(_)
417 | TransformError::IdentifierMissing(_)
418 | TransformError::CallerShouldPanic(_) => false,
419 }
420 }
421}
422
423impl ShouldTerminateGracefully for InstanceMissing {
424 fn should_terminate_gracefully(&self) -> bool {
425 false
426 }
427}
428
429pub(crate) fn viewable_variables<'a>(
431 catalog: &'a CatalogState,
432 session: &'a dyn SessionMetadata,
433) -> impl Iterator<Item = &'a dyn Var> {
434 session
435 .vars()
436 .iter()
437 .chain(catalog.system_config().iter())
438 .filter(|v| v.visible(session.user(), catalog.system_config()).is_ok())
439}
440
441pub fn verify_datum_desc(
443 desc: &RelationDesc,
444 rows: &mut dyn RowIterator,
445) -> Result<(), AdapterError> {
446 let Some(row) = rows.peek() else {
453 return Ok(());
454 };
455
456 let datums = row.unpack();
457 let col_types = &desc.typ().column_types;
458 if datums.len() != col_types.len() {
459 let msg = format!(
460 "internal error: row descriptor has {} columns but row has {} columns",
461 col_types.len(),
462 datums.len(),
463 );
464 return Err(AdapterError::Internal(msg));
465 }
466
467 for (i, (d, t)) in datums.iter().zip_eq(col_types).enumerate() {
468 if !d.is_instance_of(t) {
469 let msg = format!(
470 "internal error: column {} is not of expected type {:?}: {:?}",
471 i, t, d
472 );
473 return Err(AdapterError::Internal(msg));
474 }
475 }
476
477 Ok(())
478}