1use std::fmt::Debug;
11
12use mz_catalog::durable::{DurableCatalogError, FenceError};
13use mz_compute_client::controller::error::{
14 CollectionUpdateError, DataflowCreationError, InstanceMissing, PeekError, ReadPolicyError,
15};
16use mz_controller_types::ClusterId;
17use mz_ore::tracing::OpenTelemetryContext;
18use mz_ore::{exit, soft_assert_no_log};
19use mz_repr::{RelationDesc, RowIterator, ScalarType};
20use mz_sql::names::FullItemName;
21use mz_sql::plan::StatementDesc;
22use mz_sql::session::metadata::SessionMetadata;
23use mz_sql::session::vars::Var;
24use mz_sql_parser::ast::display::AstDisplay;
25use mz_sql_parser::ast::{
26 CreateIndexStatement, FetchStatement, Ident, Raw, RawClusterName, RawItemName, Statement,
27};
28use mz_storage_types::controller::StorageError;
29use mz_transform::TransformError;
30use tokio::sync::mpsc::UnboundedSender;
31use tokio::sync::oneshot;
32
33use crate::catalog::{Catalog, CatalogState};
34use crate::command::{Command, Response};
35use crate::coord::{Message, PendingTxnResponse};
36use crate::error::AdapterError;
37use crate::session::{EndTransactionAction, Session};
38use crate::{ExecuteContext, ExecuteResponse};
39
40#[derive(Debug)]
42pub struct ClientTransmitter<T>
43where
44 T: Transmittable,
45 <T as Transmittable>::Allowed: 'static,
46{
47 tx: Option<oneshot::Sender<Response<T>>>,
48 internal_cmd_tx: UnboundedSender<Message>,
49 allowed: Option<&'static [T::Allowed]>,
52}
53
54impl<T: Transmittable + std::fmt::Debug> ClientTransmitter<T> {
55 pub fn new(
57 tx: oneshot::Sender<Response<T>>,
58 internal_cmd_tx: UnboundedSender<Message>,
59 ) -> ClientTransmitter<T> {
60 ClientTransmitter {
61 tx: Some(tx),
62 internal_cmd_tx,
63 allowed: None,
64 }
65 }
66
67 #[mz_ore::instrument(level = "debug")]
74 pub fn send(mut self, result: Result<T, AdapterError>, session: Session) {
75 soft_assert_no_log!(
77 match (&result, self.allowed.take()) {
78 (Ok(t), Some(allowed)) => allowed.contains(&t.to_allowed()),
79 _ => true,
80 },
81 "tried to send disallowed value {result:?} through ClientTransmitter; \
82 see ClientTransmitter::set_allowed"
83 );
84
85 if let Err(res) = self
88 .tx
89 .take()
90 .expect("tx will always be `Some` unless `self` has been consumed")
91 .send(Response {
92 result,
93 session,
94 otel_ctx: OpenTelemetryContext::obtain(),
95 })
96 {
97 self.internal_cmd_tx
98 .send(Message::Command(
99 OpenTelemetryContext::obtain(),
100 Command::Terminate {
101 conn_id: res.session.conn_id().clone(),
102 tx: None,
103 },
104 ))
105 .expect("coordinator unexpectedly gone");
106 }
107 }
108
109 pub fn take(mut self) -> oneshot::Sender<Response<T>> {
110 self.tx
111 .take()
112 .expect("tx will always be `Some` unless `self` has been consumed")
113 }
114
115 pub fn set_allowed(&mut self, allowed: &'static [T::Allowed]) {
119 self.allowed = Some(allowed);
120 }
121}
122
123pub trait Transmittable {
125 type Allowed: Eq + PartialEq + std::fmt::Debug;
127 fn to_allowed(&self) -> Self::Allowed;
137}
138
139impl Transmittable for () {
140 type Allowed = bool;
141
142 fn to_allowed(&self) -> Self::Allowed {
143 true
144 }
145}
146
147#[derive(Debug)]
149pub struct CompletedClientTransmitter {
150 ctx: ExecuteContext,
151 response: Result<PendingTxnResponse, AdapterError>,
152 action: EndTransactionAction,
153}
154
155impl CompletedClientTransmitter {
156 pub fn new(
158 ctx: ExecuteContext,
159 response: Result<PendingTxnResponse, AdapterError>,
160 action: EndTransactionAction,
161 ) -> Self {
162 CompletedClientTransmitter {
163 ctx,
164 response,
165 action,
166 }
167 }
168
169 pub fn finalize(mut self) -> (ExecuteContext, Result<ExecuteResponse, AdapterError>) {
171 let changed = self
172 .ctx
173 .session_mut()
174 .vars_mut()
175 .end_transaction(self.action);
176
177 let response = self.response.map(|mut r| {
179 r.extend_params(changed);
180 ExecuteResponse::from(r)
181 });
182
183 (self.ctx, response)
184 }
185}
186
187impl<T: Transmittable> Drop for ClientTransmitter<T> {
188 fn drop(&mut self) {
189 if self.tx.is_some() {
190 panic!("client transmitter dropped without send")
191 }
192 }
193}
194
195pub fn index_sql(
198 index_name: String,
199 cluster_id: ClusterId,
200 view_name: FullItemName,
201 view_desc: &RelationDesc,
202 keys: &[usize],
203) -> String {
204 use mz_sql::ast::{Expr, Value};
205
206 CreateIndexStatement::<Raw> {
207 name: Some(Ident::new_unchecked(index_name)),
208 on_name: RawItemName::Name(mz_sql::normalize::unresolve(view_name)),
209 in_cluster: Some(RawClusterName::Resolved(cluster_id.to_string())),
210 key_parts: Some(
211 keys.iter()
212 .map(|i| match view_desc.get_unambiguous_name(*i) {
213 Some(n) => Expr::Identifier(vec![Ident::new_unchecked(n.to_string())]),
214 _ => Expr::Value(Value::Number((i + 1).to_string())),
215 })
216 .collect(),
217 ),
218 with_options: vec![],
219 if_not_exists: false,
220 }
221 .to_ast_string_stable()
222}
223
224pub fn describe(
230 catalog: &Catalog,
231 stmt: Statement<Raw>,
232 param_types: &[Option<ScalarType>],
233 session: &Session,
234) -> Result<StatementDesc, AdapterError> {
235 match stmt {
236 Statement::Fetch(FetchStatement { ref name, .. }) => {
239 match session
242 .get_portal_unverified(name.as_str())
243 .map(|p| p.desc.clone())
244 {
245 Some(mut desc) => {
246 desc.param_types = Vec::new();
249 Ok(desc)
250 }
251 None => Err(AdapterError::UnknownCursor(name.to_string())),
252 }
253 }
254 _ => {
255 let catalog = &catalog.for_session(session);
256 let (stmt, _) = mz_sql::names::resolve(catalog, stmt)?;
257 Ok(mz_sql::plan::describe(
258 session.pcx(),
259 catalog,
260 stmt,
261 param_types,
262 )?)
263 }
264 }
265}
266
267pub trait ResultExt<T> {
268 fn unwrap_or_terminate(self, context: &str) -> T;
272
273 fn maybe_terminate(self, context: &str) -> Self;
277}
278
279impl<T, E> ResultExt<T> for Result<T, E>
280where
281 E: ShouldTerminateGracefully + Debug,
282{
283 fn unwrap_or_terminate(self, context: &str) -> T {
284 match self {
285 Ok(t) => t,
286 Err(e) if e.should_terminate_gracefully() => exit!(0, "{context}: {e:?}"),
287 Err(e) => panic!("{context}: {e:?}"),
288 }
289 }
290
291 fn maybe_terminate(self, context: &str) -> Self {
292 if let Err(e) = &self {
293 if e.should_terminate_gracefully() {
294 exit!(0, "{context}: {e:?}");
295 }
296 }
297
298 self
299 }
300}
301
302trait ShouldTerminateGracefully {
305 fn should_terminate_gracefully(&self) -> bool;
308}
309
310impl ShouldTerminateGracefully for AdapterError {
311 fn should_terminate_gracefully(&self) -> bool {
312 match self {
313 AdapterError::Catalog(e) => e.should_terminate_gracefully(),
314 _ => false,
315 }
316 }
317}
318
319impl ShouldTerminateGracefully for mz_catalog::memory::error::Error {
320 fn should_terminate_gracefully(&self) -> bool {
321 match &self.kind {
322 mz_catalog::memory::error::ErrorKind::Durable(e) => e.should_terminate_gracefully(),
323 _ => false,
324 }
325 }
326}
327
328impl ShouldTerminateGracefully for mz_catalog::durable::CatalogError {
329 fn should_terminate_gracefully(&self) -> bool {
330 match &self {
331 Self::Durable(e) => e.should_terminate_gracefully(),
332 _ => false,
333 }
334 }
335}
336
337impl ShouldTerminateGracefully for DurableCatalogError {
338 fn should_terminate_gracefully(&self) -> bool {
339 match self {
340 DurableCatalogError::Fence(err) => err.should_terminate_gracefully(),
341 DurableCatalogError::IncompatibleDataVersion { .. }
342 | DurableCatalogError::IncompatiblePersistVersion { .. }
343 | DurableCatalogError::Proto(_)
344 | DurableCatalogError::Uninitialized
345 | DurableCatalogError::NotWritable(_)
346 | DurableCatalogError::DuplicateKey
347 | DurableCatalogError::UniquenessViolation
348 | DurableCatalogError::Storage(_)
349 | DurableCatalogError::Internal(_) => false,
350 }
351 }
352}
353
354impl ShouldTerminateGracefully for FenceError {
355 fn should_terminate_gracefully(&self) -> bool {
356 match self {
357 FenceError::DeployGeneration { .. } => true,
358 FenceError::Epoch { .. } | FenceError::MigrationUpper { .. } => false,
359 }
360 }
361}
362
363impl<T> ShouldTerminateGracefully for StorageError<T> {
364 fn should_terminate_gracefully(&self) -> bool {
365 match self {
366 StorageError::ResourceExhausted(_)
367 | StorageError::CollectionMetadataAlreadyExists(_)
368 | StorageError::PersistShardAlreadyInUse(_)
369 | StorageError::PersistSchemaEvolveRace { .. }
370 | StorageError::PersistInvalidSchemaEvolve { .. }
371 | StorageError::TxnWalShardAlreadyExists
372 | StorageError::UpdateBeyondUpper(_)
373 | StorageError::ReadBeforeSince(_)
374 | StorageError::InvalidUppers(_)
375 | StorageError::InvalidUsage(_)
376 | StorageError::CollectionIdReused(_)
377 | StorageError::SinkIdReused(_)
378 | StorageError::IdentifierMissing(_)
379 | StorageError::IdentifierInvalid(_)
380 | StorageError::IngestionInstanceMissing { .. }
381 | StorageError::ExportInstanceMissing { .. }
382 | StorageError::Generic(_)
383 | StorageError::ReadOnly
384 | StorageError::DataflowError(_)
385 | StorageError::InvalidAlter { .. }
386 | StorageError::ShuttingDown(_)
387 | StorageError::MissingSubsourceReference { .. }
388 | StorageError::RtrTimeout(_)
389 | StorageError::RtrDropFailure(_) => false,
390 }
391 }
392}
393
394impl ShouldTerminateGracefully for DataflowCreationError {
395 fn should_terminate_gracefully(&self) -> bool {
396 match self {
397 DataflowCreationError::SinceViolation(_)
398 | DataflowCreationError::InstanceMissing(_)
399 | DataflowCreationError::CollectionMissing(_)
400 | DataflowCreationError::ReplicaMissing(_)
401 | DataflowCreationError::MissingAsOf
402 | DataflowCreationError::EmptyAsOfForSubscribe
403 | DataflowCreationError::EmptyAsOfForCopyTo => false,
404 }
405 }
406}
407
408impl ShouldTerminateGracefully for CollectionUpdateError {
409 fn should_terminate_gracefully(&self) -> bool {
410 match self {
411 CollectionUpdateError::InstanceMissing(_)
412 | CollectionUpdateError::CollectionMissing(_) => false,
413 }
414 }
415}
416
417impl ShouldTerminateGracefully for PeekError {
418 fn should_terminate_gracefully(&self) -> bool {
419 match self {
420 PeekError::SinceViolation(_)
421 | PeekError::InstanceMissing(_)
422 | PeekError::CollectionMissing(_)
423 | PeekError::ReplicaMissing(_) => false,
424 }
425 }
426}
427
428impl ShouldTerminateGracefully for ReadPolicyError {
429 fn should_terminate_gracefully(&self) -> bool {
430 match self {
431 ReadPolicyError::InstanceMissing(_)
432 | ReadPolicyError::CollectionMissing(_)
433 | ReadPolicyError::WriteOnlyCollection(_) => false,
434 }
435 }
436}
437
438impl ShouldTerminateGracefully for TransformError {
439 fn should_terminate_gracefully(&self) -> bool {
440 match self {
441 TransformError::Internal(_)
442 | TransformError::IdentifierMissing(_)
443 | TransformError::CallerShouldPanic(_) => false,
444 }
445 }
446}
447
448impl ShouldTerminateGracefully for InstanceMissing {
449 fn should_terminate_gracefully(&self) -> bool {
450 false
451 }
452}
453
454pub(crate) fn viewable_variables<'a>(
456 catalog: &'a CatalogState,
457 session: &'a dyn SessionMetadata,
458) -> impl Iterator<Item = &'a dyn Var> {
459 session
460 .vars()
461 .iter()
462 .chain(catalog.system_config().iter())
463 .filter(|v| v.visible(session.user(), catalog.system_config()).is_ok())
464}
465
466pub fn verify_datum_desc(
468 desc: &RelationDesc,
469 rows: &mut dyn RowIterator,
470) -> Result<(), AdapterError> {
471 let Some(row) = rows.peek() else {
478 return Ok(());
479 };
480
481 let datums = row.unpack();
482 let col_types = &desc.typ().column_types;
483 if datums.len() != col_types.len() {
484 let msg = format!(
485 "internal error: row descriptor has {} columns but row has {} columns",
486 col_types.len(),
487 datums.len(),
488 );
489 return Err(AdapterError::Internal(msg));
490 }
491
492 for (i, (d, t)) in datums.iter().zip(col_types).enumerate() {
493 if !d.is_instance_of(t) {
494 let msg = format!(
495 "internal error: column {} is not of expected type {:?}: {:?}",
496 i, t, d
497 );
498 return Err(AdapterError::Internal(msg));
499 }
500 }
501
502 Ok(())
503}