1use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12
13use itertools::Itertools;
14use mz_catalog::durable::{DurableCatalogError, FenceError};
15use mz_compute_client::controller::error::{
16 CollectionUpdateError, DataflowCreationError, InstanceMissing, PeekError, ReadPolicyError,
17};
18use mz_controller_types::ClusterId;
19use mz_ore::tracing::OpenTelemetryContext;
20use mz_ore::{assert_none, exit, soft_assert_no_log};
21use mz_repr::{RelationDesc, RowIterator, SqlScalarType};
22use mz_sql::names::FullItemName;
23use mz_sql::plan::StatementDesc;
24use mz_sql::session::metadata::SessionMetadata;
25use mz_sql::session::vars::Var;
26use mz_sql_parser::ast::display::AstDisplay;
27use mz_sql_parser::ast::{
28 CreateIndexStatement, Ident, Raw, RawClusterName, RawItemName, Statement,
29};
30use mz_storage_types::controller::StorageError;
31use mz_transform::TransformError;
32use tokio::sync::mpsc::UnboundedSender;
33use tokio::sync::oneshot;
34
35use crate::catalog::{Catalog, CatalogState};
36use crate::command::{Command, Response};
37use crate::coord::{Message, PendingTxnResponse};
38use crate::error::AdapterError;
39use crate::session::{EndTransactionAction, Session};
40use crate::{ExecuteContext, ExecuteResponse};
41
42#[derive(Debug)]
44pub struct ClientTransmitter<T>
45where
46 T: Transmittable,
47 <T as Transmittable>::Allowed: 'static,
48{
49 tx: Option<oneshot::Sender<Response<T>>>,
50 internal_cmd_tx: UnboundedSender<Message>,
51 allowed: Option<&'static [T::Allowed]>,
54}
55
56impl<T: Transmittable + std::fmt::Debug> ClientTransmitter<T> {
57 pub fn new(
59 tx: oneshot::Sender<Response<T>>,
60 internal_cmd_tx: UnboundedSender<Message>,
61 ) -> ClientTransmitter<T> {
62 ClientTransmitter {
63 tx: Some(tx),
64 internal_cmd_tx,
65 allowed: None,
66 }
67 }
68
69 #[mz_ore::instrument(level = "debug")]
76 pub fn send(mut self, result: Result<T, AdapterError>, session: Session) {
77 soft_assert_no_log!(
79 match (&result, self.allowed.take()) {
80 (Ok(t), Some(allowed)) => allowed.contains(&t.to_allowed()),
81 _ => true,
82 },
83 "tried to send disallowed value {result:?} through ClientTransmitter; \
84 see ClientTransmitter::set_allowed"
85 );
86
87 if let Err(res) = self
90 .tx
91 .take()
92 .expect("tx will always be `Some` unless `self` has been consumed")
93 .send(Response {
94 result,
95 session,
96 otel_ctx: OpenTelemetryContext::obtain(),
97 })
98 {
99 self.internal_cmd_tx
100 .send(Message::Command(
101 OpenTelemetryContext::obtain(),
102 Command::Terminate {
103 conn_id: res.session.conn_id().clone(),
104 tx: None,
105 },
106 ))
107 .expect("coordinator unexpectedly gone");
108 }
109 }
110
111 pub fn take(mut self) -> oneshot::Sender<Response<T>> {
112 self.tx
113 .take()
114 .expect("tx will always be `Some` unless `self` has been consumed")
115 }
116
117 pub fn set_allowed(&mut self, allowed: &'static [T::Allowed]) {
121 self.allowed = Some(allowed);
122 }
123}
124
125pub trait Transmittable {
127 type Allowed: Eq + PartialEq + std::fmt::Debug;
129 fn to_allowed(&self) -> Self::Allowed;
139}
140
141impl Transmittable for () {
142 type Allowed = bool;
143
144 fn to_allowed(&self) -> Self::Allowed {
145 true
146 }
147}
148
149#[derive(Debug)]
151pub struct CompletedClientTransmitter {
152 ctx: ExecuteContext,
153 response: Result<PendingTxnResponse, AdapterError>,
154 action: EndTransactionAction,
155}
156
157impl CompletedClientTransmitter {
158 pub fn new(
160 ctx: ExecuteContext,
161 response: Result<PendingTxnResponse, AdapterError>,
162 action: EndTransactionAction,
163 ) -> Self {
164 CompletedClientTransmitter {
165 ctx,
166 response,
167 action,
168 }
169 }
170
171 pub fn finalize(mut self) -> (ExecuteContext, Result<ExecuteResponse, AdapterError>) {
173 let changed = self
174 .ctx
175 .session_mut()
176 .vars_mut()
177 .end_transaction(self.action);
178
179 let response = self.response.map(|mut r| {
181 r.extend_params(changed);
182 ExecuteResponse::from(r)
183 });
184
185 (self.ctx, response)
186 }
187}
188
189impl<T: Transmittable> Drop for ClientTransmitter<T> {
190 fn drop(&mut self) {
191 if self.tx.is_some() {
192 panic!("client transmitter dropped without send")
193 }
194 }
195}
196
197pub fn index_sql(
200 index_name: String,
201 cluster_id: ClusterId,
202 view_name: FullItemName,
203 view_desc: &RelationDesc,
204 keys: &[usize],
205) -> String {
206 use mz_sql::ast::{Expr, Value};
207
208 CreateIndexStatement::<Raw> {
209 name: Some(Ident::new_unchecked(index_name)),
210 on_name: RawItemName::Name(mz_sql::normalize::unresolve(view_name)),
211 in_cluster: Some(RawClusterName::Resolved(cluster_id.to_string())),
212 key_parts: Some(
213 keys.iter()
214 .map(|i| match view_desc.get_unambiguous_name(*i) {
215 Some(n) => Expr::Identifier(vec![Ident::new_unchecked(n.to_string())]),
216 _ => Expr::Value(Value::Number((i + 1).to_string())),
217 })
218 .collect(),
219 ),
220 with_options: vec![],
221 if_not_exists: false,
222 }
223 .to_ast_string_stable()
224}
225
226pub fn describe(
228 catalog: &Catalog,
229 stmt: Statement<Raw>,
230 param_types: &[Option<SqlScalarType>],
231 session: &Session,
232) -> Result<StatementDesc, AdapterError> {
233 let catalog = &catalog.for_session(session);
234 let (stmt, _) = mz_sql::names::resolve(catalog, stmt)?;
235 Ok(mz_sql::plan::describe(
236 session.pcx(),
237 catalog,
238 stmt,
239 param_types,
240 )?)
241}
242
243pub trait ResultExt<T> {
244 fn unwrap_or_terminate(self, context: &str) -> T;
248
249 fn maybe_terminate(self, context: &str) -> Self;
253}
254
255impl<T, E> ResultExt<T> for Result<T, E>
256where
257 E: ShouldTerminateGracefully + Debug,
258{
259 fn unwrap_or_terminate(self, context: &str) -> T {
260 match self {
261 Ok(t) => t,
262 Err(e) if e.should_terminate_gracefully() => exit!(0, "{context}: {e:?}"),
263 Err(e) => panic!("{context}: {e:?}"),
264 }
265 }
266
267 fn maybe_terminate(self, context: &str) -> Self {
268 if let Err(e) = &self {
269 if e.should_terminate_gracefully() {
270 exit!(0, "{context}: {e:?}");
271 }
272 }
273
274 self
275 }
276}
277
278trait ShouldTerminateGracefully {
281 fn should_terminate_gracefully(&self) -> bool;
284}
285
286impl ShouldTerminateGracefully for AdapterError {
287 fn should_terminate_gracefully(&self) -> bool {
288 match self {
289 AdapterError::Catalog(e) => e.should_terminate_gracefully(),
290 _ => false,
291 }
292 }
293}
294
295impl ShouldTerminateGracefully for mz_catalog::memory::error::Error {
296 fn should_terminate_gracefully(&self) -> bool {
297 match &self.kind {
298 mz_catalog::memory::error::ErrorKind::Durable(e) => e.should_terminate_gracefully(),
299 _ => false,
300 }
301 }
302}
303
304impl ShouldTerminateGracefully for mz_catalog::durable::CatalogError {
305 fn should_terminate_gracefully(&self) -> bool {
306 match &self {
307 Self::Durable(e) => e.should_terminate_gracefully(),
308 _ => false,
309 }
310 }
311}
312
313impl ShouldTerminateGracefully for DurableCatalogError {
314 fn should_terminate_gracefully(&self) -> bool {
315 match self {
316 DurableCatalogError::Fence(err) => err.should_terminate_gracefully(),
317 DurableCatalogError::IncompatibleDataVersion { .. }
318 | DurableCatalogError::IncompatiblePersistVersion { .. }
319 | DurableCatalogError::Proto(_)
320 | DurableCatalogError::Uninitialized
321 | DurableCatalogError::NotWritable(_)
322 | DurableCatalogError::DuplicateKey
323 | DurableCatalogError::UniquenessViolation
324 | DurableCatalogError::Storage(_)
325 | DurableCatalogError::Internal(_) => false,
326 }
327 }
328}
329
330impl ShouldTerminateGracefully for FenceError {
331 fn should_terminate_gracefully(&self) -> bool {
332 match self {
333 FenceError::DeployGeneration { .. } => true,
334 FenceError::Epoch { .. } | FenceError::MigrationUpper { .. } => false,
335 }
336 }
337}
338
339impl<T> ShouldTerminateGracefully for StorageError<T> {
340 fn should_terminate_gracefully(&self) -> bool {
341 match self {
342 StorageError::ResourceExhausted(_)
343 | StorageError::CollectionMetadataAlreadyExists(_)
344 | StorageError::PersistShardAlreadyInUse(_)
345 | StorageError::PersistSchemaEvolveRace { .. }
346 | StorageError::PersistInvalidSchemaEvolve { .. }
347 | StorageError::TxnWalShardAlreadyExists
348 | StorageError::UpdateBeyondUpper(_)
349 | StorageError::ReadBeforeSince(_)
350 | StorageError::InvalidUppers(_)
351 | StorageError::InvalidUsage(_)
352 | StorageError::CollectionIdReused(_)
353 | StorageError::SinkIdReused(_)
354 | StorageError::IdentifierMissing(_)
355 | StorageError::IdentifierInvalid(_)
356 | StorageError::IngestionInstanceMissing { .. }
357 | StorageError::ExportInstanceMissing { .. }
358 | StorageError::Generic(_)
359 | StorageError::ReadOnly
360 | StorageError::DataflowError(_)
361 | StorageError::InvalidAlter { .. }
362 | StorageError::ShuttingDown(_)
363 | StorageError::MissingSubsourceReference { .. }
364 | StorageError::RtrTimeout(_)
365 | StorageError::RtrDropFailure(_) => false,
366 }
367 }
368}
369
370impl ShouldTerminateGracefully for DataflowCreationError {
371 fn should_terminate_gracefully(&self) -> bool {
372 match self {
373 DataflowCreationError::SinceViolation(_)
374 | DataflowCreationError::InstanceMissing(_)
375 | DataflowCreationError::CollectionMissing(_)
376 | DataflowCreationError::ReplicaMissing(_)
377 | DataflowCreationError::MissingAsOf
378 | DataflowCreationError::EmptyAsOfForSubscribe
379 | DataflowCreationError::EmptyAsOfForCopyTo => false,
380 }
381 }
382}
383
384impl ShouldTerminateGracefully for CollectionUpdateError {
385 fn should_terminate_gracefully(&self) -> bool {
386 match self {
387 CollectionUpdateError::InstanceMissing(_)
388 | CollectionUpdateError::CollectionMissing(_) => false,
389 }
390 }
391}
392
393impl ShouldTerminateGracefully for PeekError {
394 fn should_terminate_gracefully(&self) -> bool {
395 match self {
396 PeekError::SinceViolation(_)
397 | PeekError::InstanceMissing(_)
398 | PeekError::CollectionMissing(_)
399 | PeekError::ReplicaMissing(_) => false,
400 }
401 }
402}
403
404impl ShouldTerminateGracefully for ReadPolicyError {
405 fn should_terminate_gracefully(&self) -> bool {
406 match self {
407 ReadPolicyError::InstanceMissing(_)
408 | ReadPolicyError::CollectionMissing(_)
409 | ReadPolicyError::WriteOnlyCollection(_) => false,
410 }
411 }
412}
413
414impl ShouldTerminateGracefully for TransformError {
415 fn should_terminate_gracefully(&self) -> bool {
416 match self {
417 TransformError::Internal(_)
418 | TransformError::IdentifierMissing(_)
419 | TransformError::CallerShouldPanic(_) => false,
420 }
421 }
422}
423
424impl ShouldTerminateGracefully for InstanceMissing {
425 fn should_terminate_gracefully(&self) -> bool {
426 false
427 }
428}
429
430pub(crate) fn viewable_variables<'a>(
432 catalog: &'a CatalogState,
433 session: &'a dyn SessionMetadata,
434) -> impl Iterator<Item = &'a dyn Var> {
435 session
436 .vars()
437 .iter()
438 .chain(catalog.system_config().iter())
439 .filter(|v| v.visible(session.user(), catalog.system_config()).is_ok())
440}
441
442pub fn verify_datum_desc(
444 desc: &RelationDesc,
445 rows: &mut dyn RowIterator,
446) -> Result<(), AdapterError> {
447 let Some(row) = rows.peek() else {
454 return Ok(());
455 };
456
457 let datums = row.unpack();
458 let col_types = &desc.typ().column_types;
459 if datums.len() != col_types.len() {
460 let msg = format!(
461 "internal error: row descriptor has {} columns but row has {} columns",
462 col_types.len(),
463 datums.len(),
464 );
465 return Err(AdapterError::Internal(msg));
466 }
467
468 for (i, (d, t)) in datums.iter().zip_eq(col_types).enumerate() {
469 if !d.is_instance_of_sql(t) {
470 let msg = format!(
471 "internal error: column {} is not of expected type {:?}: {:?}",
472 i, t, d
473 );
474 return Err(AdapterError::Internal(msg));
475 }
476 }
477
478 Ok(())
479}
480
481pub fn sort_topological<T, K, FK, FD>(items: &mut Vec<T>, key_fn: FK, dependencies_fn: FD)
488where
489 T: Debug,
490 K: Debug + Copy + Ord,
491 FK: Fn(&T) -> K,
492 FD: Fn(&T) -> BTreeSet<K>,
493{
494 let mut items_by_key = BTreeMap::new();
495 for item in items.drain(..) {
496 let key = key_fn(&item);
497 let prev = items_by_key.insert(key, item);
498 assert_none!(prev);
499 }
500
501 let mut in_degree = BTreeMap::<K, usize>::new();
503 let mut dependents = BTreeMap::<K, Vec<K>>::new();
505 let mut ready = Vec::<K>::new();
507
508 for (&key, item) in &items_by_key {
510 let mut dependencies = dependencies_fn(item);
511 dependencies.retain(|dep| items_by_key.contains_key(dep) && *dep != key);
513
514 in_degree.insert(key, dependencies.len());
515
516 for dep in &dependencies {
517 dependents.entry(*dep).or_default().push(key);
518 }
519
520 if dependencies.is_empty() {
521 ready.push(key);
522 }
523 }
524
525 while let Some(id) = ready.pop() {
527 let item = items_by_key.remove(&id).expect("must exist");
528 items.push(item);
529
530 if let Some(depts) = dependents.get(&id) {
531 for dept in depts {
532 let deg = in_degree.get_mut(dept).expect("must exist");
533 *deg -= 1;
534 if *deg == 0 {
535 ready.push(*dept);
536 }
537 }
538 }
539 }
540
541 if !items_by_key.is_empty() {
543 panic!("dependency cycle: {items_by_key:?}");
544 }
545}