Skip to main content

mz_adapter/
util.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::fmt::Debug;
12
13use itertools::Itertools;
14use mz_catalog::durable::{DurableCatalogError, FenceError};
15use mz_compute_client::controller::error::{
16    CollectionUpdateError, DataflowCreationError, InstanceMissing, PeekError, ReadPolicyError,
17};
18use mz_controller_types::ClusterId;
19use mz_ore::tracing::OpenTelemetryContext;
20use mz_ore::{assert_none, exit, soft_assert_no_log};
21use mz_repr::{RelationDesc, RowIterator, SqlScalarType};
22use mz_sql::names::FullItemName;
23use mz_sql::plan::StatementDesc;
24use mz_sql::session::metadata::SessionMetadata;
25use mz_sql::session::vars::Var;
26use mz_sql_parser::ast::display::AstDisplay;
27use mz_sql_parser::ast::{
28    CreateIndexStatement, Ident, Raw, RawClusterName, RawItemName, Statement,
29};
30use mz_storage_types::controller::StorageError;
31use mz_transform::TransformError;
32use tokio::sync::mpsc::UnboundedSender;
33use tokio::sync::oneshot;
34
35use crate::catalog::{Catalog, CatalogState};
36use crate::command::{Command, Response};
37use crate::coord::{Message, PendingTxnResponse};
38use crate::error::AdapterError;
39use crate::session::{EndTransactionAction, Session};
40use crate::{ExecuteContext, ExecuteResponse};
41
42/// Handles responding to clients.
43#[derive(Debug)]
44pub struct ClientTransmitter<T>
45where
46    T: Transmittable,
47    <T as Transmittable>::Allowed: 'static,
48{
49    tx: Option<oneshot::Sender<Response<T>>>,
50    internal_cmd_tx: UnboundedSender<Message>,
51    /// Expresses an optional soft-assert on the set of values allowed to be
52    /// sent from `self`.
53    allowed: Option<&'static [T::Allowed]>,
54}
55
56impl<T: Transmittable + std::fmt::Debug> ClientTransmitter<T> {
57    /// Creates a new client transmitter.
58    pub fn new(
59        tx: oneshot::Sender<Response<T>>,
60        internal_cmd_tx: UnboundedSender<Message>,
61    ) -> ClientTransmitter<T> {
62        ClientTransmitter {
63            tx: Some(tx),
64            internal_cmd_tx,
65            allowed: None,
66        }
67    }
68
69    /// Transmits `result` to the client, returning ownership of the session
70    /// `session` as well.
71    ///
72    /// # Panics
73    /// - If in `soft_assert`, `result.is_ok()`, `self.allowed.is_some()`, and
74    ///   the result value is not in the set of allowed values.
75    #[mz_ore::instrument(level = "debug")]
76    pub fn send(mut self, result: Result<T, AdapterError>, session: Session) {
77        // Guarantee that the value sent is of an allowed type.
78        soft_assert_no_log!(
79            match (&result, self.allowed.take()) {
80                (Ok(t), Some(allowed)) => allowed.contains(&t.to_allowed()),
81                _ => true,
82            },
83            "tried to send disallowed value {result:?} through ClientTransmitter; \
84            see ClientTransmitter::set_allowed"
85        );
86
87        // If we were not able to send a message, we must clean up the session
88        // ourselves. Return it to the caller for disposal.
89        if let Err(res) = self
90            .tx
91            .take()
92            .expect("tx will always be `Some` unless `self` has been consumed")
93            .send(Response {
94                result,
95                session,
96                otel_ctx: OpenTelemetryContext::obtain(),
97            })
98        {
99            self.internal_cmd_tx
100                .send(Message::Command(
101                    OpenTelemetryContext::obtain(),
102                    Command::Terminate {
103                        conn_id: res.session.conn_id().clone(),
104                        tx: None,
105                    },
106                ))
107                .expect("coordinator unexpectedly gone");
108        }
109    }
110
111    pub fn take(mut self) -> oneshot::Sender<Response<T>> {
112        self.tx
113            .take()
114            .expect("tx will always be `Some` unless `self` has been consumed")
115    }
116
117    /// Sets `self` so that the next call to [`Self::send`] will soft-assert
118    /// that, if `Ok`, the value is one of `allowed`, as determined by
119    /// [`Transmittable::to_allowed`].
120    pub fn set_allowed(&mut self, allowed: &'static [T::Allowed]) {
121        self.allowed = Some(allowed);
122    }
123}
124
125/// A helper trait for [`ClientTransmitter`].
126pub trait Transmittable {
127    /// The type of values used to express which set of values are allowed.
128    type Allowed: Eq + PartialEq + std::fmt::Debug;
129    /// The conversion from the [`ClientTransmitter`]'s type to `Allowed`.
130    ///
131    /// The benefit of this style of trait, rather than relying on a bound on
132    /// `Allowed`, are:
133    /// - Not requiring a clone
134    /// - The flexibility for facile implementations that do not plan to make
135    ///   use of the `allowed` feature. Those types can simply implement this
136    ///   trait for `bool`, and return `true`. However, it might not be
137    ///   semantically appropriate to expose `From<&Self> for bool`.
138    fn to_allowed(&self) -> Self::Allowed;
139}
140
141impl Transmittable for () {
142    type Allowed = bool;
143
144    fn to_allowed(&self) -> Self::Allowed {
145        true
146    }
147}
148
149/// `ClientTransmitter` with a response to send.
150#[derive(Debug)]
151pub struct CompletedClientTransmitter {
152    ctx: ExecuteContext,
153    response: Result<PendingTxnResponse, AdapterError>,
154    action: EndTransactionAction,
155}
156
157impl CompletedClientTransmitter {
158    /// Creates a new completed client transmitter.
159    pub fn new(
160        ctx: ExecuteContext,
161        response: Result<PendingTxnResponse, AdapterError>,
162        action: EndTransactionAction,
163    ) -> Self {
164        CompletedClientTransmitter {
165            ctx,
166            response,
167            action,
168        }
169    }
170
171    /// Returns the execute context to be finalized, and the result to send it.
172    pub fn finalize(mut self) -> (ExecuteContext, Result<ExecuteResponse, AdapterError>) {
173        let changed = self
174            .ctx
175            .session_mut()
176            .vars_mut()
177            .end_transaction(self.action);
178
179        // Append any parameters that changed to the response.
180        let response = self.response.map(|mut r| {
181            r.extend_params(changed);
182            ExecuteResponse::from(r)
183        });
184
185        (self.ctx, response)
186    }
187}
188
189impl<T: Transmittable> Drop for ClientTransmitter<T> {
190    fn drop(&mut self) {
191        if self.tx.is_some() {
192            panic!("client transmitter dropped without send")
193        }
194    }
195}
196
197// TODO(benesch): constructing the canonical CREATE INDEX statement should be
198// the responsibility of the SQL package.
199pub fn index_sql(
200    index_name: String,
201    cluster_id: ClusterId,
202    view_name: FullItemName,
203    view_desc: &RelationDesc,
204    keys: &[usize],
205) -> String {
206    use mz_sql::ast::{Expr, Value};
207
208    CreateIndexStatement::<Raw> {
209        name: Some(Ident::new_unchecked(index_name)),
210        on_name: RawItemName::Name(mz_sql::normalize::unresolve(view_name)),
211        in_cluster: Some(RawClusterName::Resolved(cluster_id.to_string())),
212        key_parts: Some(
213            keys.iter()
214                .map(|i| match view_desc.get_unambiguous_name(*i) {
215                    Some(n) => Expr::Identifier(vec![Ident::new_unchecked(n.to_string())]),
216                    _ => Expr::Value(Value::Number((i + 1).to_string())),
217                })
218                .collect(),
219        ),
220        with_options: vec![],
221        if_not_exists: false,
222    }
223    .to_ast_string_stable()
224}
225
226/// Creates a description of the statement `stmt`.
227pub fn describe(
228    catalog: &Catalog,
229    stmt: Statement<Raw>,
230    param_types: &[Option<SqlScalarType>],
231    session: &Session,
232) -> Result<StatementDesc, AdapterError> {
233    let catalog = &catalog.for_session(session);
234    let (stmt, _) = mz_sql::names::resolve(catalog, stmt)?;
235    Ok(mz_sql::plan::describe(
236        session.pcx(),
237        catalog,
238        stmt,
239        param_types,
240    )?)
241}
242
243pub trait ResultExt<T> {
244    /// Like [`Result::expect`], but terminates the process with `halt` or
245    /// exit code 0 instead of `panic` if the error indicates that it should
246    /// cause a halt of graceful termination.
247    fn unwrap_or_terminate(self, context: &str) -> T;
248
249    /// Terminates the process with `halt` or exit code 0 if `self` is an
250    /// error that should halt or cause graceful termination. Otherwise,
251    /// does nothing.
252    fn maybe_terminate(self, context: &str) -> Self;
253}
254
255impl<T, E> ResultExt<T> for Result<T, E>
256where
257    E: ShouldTerminateGracefully + Debug,
258{
259    fn unwrap_or_terminate(self, context: &str) -> T {
260        match self {
261            Ok(t) => t,
262            Err(e) if e.should_terminate_gracefully() => exit!(0, "{context}: {e:?}"),
263            Err(e) => panic!("{context}: {e:?}"),
264        }
265    }
266
267    fn maybe_terminate(self, context: &str) -> Self {
268        if let Err(e) = &self {
269            if e.should_terminate_gracefully() {
270                exit!(0, "{context}: {e:?}");
271            }
272        }
273
274        self
275    }
276}
277
278/// A trait for errors that should terminate gracefully rather than panic
279/// the process.
280trait ShouldTerminateGracefully {
281    /// Reports whether the error should terminate the process gracefully
282    /// rather than panic.
283    fn should_terminate_gracefully(&self) -> bool;
284}
285
286impl ShouldTerminateGracefully for AdapterError {
287    fn should_terminate_gracefully(&self) -> bool {
288        match self {
289            AdapterError::Catalog(e) => e.should_terminate_gracefully(),
290            _ => false,
291        }
292    }
293}
294
295impl ShouldTerminateGracefully for mz_catalog::memory::error::Error {
296    fn should_terminate_gracefully(&self) -> bool {
297        match &self.kind {
298            mz_catalog::memory::error::ErrorKind::Durable(e) => e.should_terminate_gracefully(),
299            _ => false,
300        }
301    }
302}
303
304impl ShouldTerminateGracefully for mz_catalog::durable::CatalogError {
305    fn should_terminate_gracefully(&self) -> bool {
306        match &self {
307            Self::Durable(e) => e.should_terminate_gracefully(),
308            _ => false,
309        }
310    }
311}
312
313impl ShouldTerminateGracefully for DurableCatalogError {
314    fn should_terminate_gracefully(&self) -> bool {
315        match self {
316            DurableCatalogError::Fence(err) => err.should_terminate_gracefully(),
317            DurableCatalogError::IncompatibleDataVersion { .. }
318            | DurableCatalogError::IncompatiblePersistVersion { .. }
319            | DurableCatalogError::Proto(_)
320            | DurableCatalogError::Uninitialized
321            | DurableCatalogError::NotWritable(_)
322            | DurableCatalogError::DuplicateKey
323            | DurableCatalogError::UniquenessViolation
324            | DurableCatalogError::Storage(_)
325            | DurableCatalogError::Internal(_) => false,
326        }
327    }
328}
329
330impl ShouldTerminateGracefully for FenceError {
331    fn should_terminate_gracefully(&self) -> bool {
332        match self {
333            FenceError::DeployGeneration { .. } => true,
334            FenceError::Epoch { .. } | FenceError::MigrationUpper { .. } => false,
335        }
336    }
337}
338
339impl<T> ShouldTerminateGracefully for StorageError<T> {
340    fn should_terminate_gracefully(&self) -> bool {
341        match self {
342            StorageError::ResourceExhausted(_)
343            | StorageError::CollectionMetadataAlreadyExists(_)
344            | StorageError::PersistShardAlreadyInUse(_)
345            | StorageError::PersistSchemaEvolveRace { .. }
346            | StorageError::PersistInvalidSchemaEvolve { .. }
347            | StorageError::TxnWalShardAlreadyExists
348            | StorageError::UpdateBeyondUpper(_)
349            | StorageError::ReadBeforeSince(_)
350            | StorageError::InvalidUppers(_)
351            | StorageError::InvalidUsage(_)
352            | StorageError::CollectionIdReused(_)
353            | StorageError::SinkIdReused(_)
354            | StorageError::IdentifierMissing(_)
355            | StorageError::IdentifierInvalid(_)
356            | StorageError::IngestionInstanceMissing { .. }
357            | StorageError::ExportInstanceMissing { .. }
358            | StorageError::Generic(_)
359            | StorageError::ReadOnly
360            | StorageError::DataflowError(_)
361            | StorageError::InvalidAlter { .. }
362            | StorageError::ShuttingDown(_)
363            | StorageError::MissingSubsourceReference { .. }
364            | StorageError::RtrTimeout(_)
365            | StorageError::RtrDropFailure(_) => false,
366        }
367    }
368}
369
370impl ShouldTerminateGracefully for DataflowCreationError {
371    fn should_terminate_gracefully(&self) -> bool {
372        match self {
373            DataflowCreationError::SinceViolation(_)
374            | DataflowCreationError::InstanceMissing(_)
375            | DataflowCreationError::CollectionMissing(_)
376            | DataflowCreationError::ReplicaMissing(_)
377            | DataflowCreationError::MissingAsOf
378            | DataflowCreationError::EmptyAsOfForSubscribe
379            | DataflowCreationError::EmptyAsOfForCopyTo => false,
380        }
381    }
382}
383
384impl ShouldTerminateGracefully for CollectionUpdateError {
385    fn should_terminate_gracefully(&self) -> bool {
386        match self {
387            CollectionUpdateError::InstanceMissing(_)
388            | CollectionUpdateError::CollectionMissing(_) => false,
389        }
390    }
391}
392
393impl ShouldTerminateGracefully for PeekError {
394    fn should_terminate_gracefully(&self) -> bool {
395        match self {
396            PeekError::SinceViolation(_)
397            | PeekError::InstanceMissing(_)
398            | PeekError::CollectionMissing(_)
399            | PeekError::ReplicaMissing(_) => false,
400        }
401    }
402}
403
404impl ShouldTerminateGracefully for ReadPolicyError {
405    fn should_terminate_gracefully(&self) -> bool {
406        match self {
407            ReadPolicyError::InstanceMissing(_)
408            | ReadPolicyError::CollectionMissing(_)
409            | ReadPolicyError::WriteOnlyCollection(_) => false,
410        }
411    }
412}
413
414impl ShouldTerminateGracefully for TransformError {
415    fn should_terminate_gracefully(&self) -> bool {
416        match self {
417            TransformError::Internal(_)
418            | TransformError::IdentifierMissing(_)
419            | TransformError::CallerShouldPanic(_) => false,
420        }
421    }
422}
423
424impl ShouldTerminateGracefully for InstanceMissing {
425    fn should_terminate_gracefully(&self) -> bool {
426        false
427    }
428}
429
430/// Returns the viewable session and system variables.
431pub(crate) fn viewable_variables<'a>(
432    catalog: &'a CatalogState,
433    session: &'a dyn SessionMetadata,
434) -> impl Iterator<Item = &'a dyn Var> {
435    session
436        .vars()
437        .iter()
438        .chain(catalog.system_config().iter())
439        .filter(|v| v.visible(session.user(), catalog.system_config()).is_ok())
440}
441
442/// Verify that the rows in [`RowIterator`] match the expected [`RelationDesc`].
443pub fn verify_datum_desc(
444    desc: &RelationDesc,
445    rows: &mut dyn RowIterator,
446) -> Result<(), AdapterError> {
447    // Verify the first row is of the expected type. This is often good enough to
448    // find problems.
449    //
450    // Notably it failed to find database-issues#1946 when "FETCH 2" was used in a test, instead
451    // we had to use "FETCH 1" twice.
452
453    let Some(row) = rows.peek() else {
454        return Ok(());
455    };
456
457    let datums = row.unpack();
458    let col_types = &desc.typ().column_types;
459    if datums.len() != col_types.len() {
460        let msg = format!(
461            "internal error: row descriptor has {} columns but row has {} columns",
462            col_types.len(),
463            datums.len(),
464        );
465        return Err(AdapterError::Internal(msg));
466    }
467
468    for (i, (d, t)) in datums.iter().zip_eq(col_types).enumerate() {
469        if !d.is_instance_of_sql(t) {
470            let msg = format!(
471                "internal error: column {} is not of expected type {:?}: {:?}",
472                i, t, d
473            );
474            return Err(AdapterError::Internal(msg));
475        }
476    }
477
478    Ok(())
479}
480
481/// Sort items in dependency order using topological sort.
482///
483/// # Panics
484///
485/// Panics if `key_fn` produces non-unique keys for the provided `items`.
486/// Panics if there is a dependency cycle among the provided `items`.
487pub fn sort_topological<T, K, FK, FD>(items: &mut Vec<T>, key_fn: FK, dependencies_fn: FD)
488where
489    T: Debug,
490    K: Debug + Copy + Ord,
491    FK: Fn(&T) -> K,
492    FD: Fn(&T) -> BTreeSet<K>,
493{
494    let mut items_by_key = BTreeMap::new();
495    for item in items.drain(..) {
496        let key = key_fn(&item);
497        let prev = items_by_key.insert(key, item);
498        assert_none!(prev);
499    }
500
501    // For each item, the number of unprocessed dependencies.
502    let mut in_degree = BTreeMap::<K, usize>::new();
503    // For each item, the keys of items depending on it.
504    let mut dependents = BTreeMap::<K, Vec<K>>::new();
505    // Items that have no unprocessed dependencies.
506    let mut ready = Vec::<K>::new();
507
508    // Build the graph.
509    for (&key, item) in &items_by_key {
510        let mut dependencies = dependencies_fn(item);
511        // Remove any dependencies not contained in `items`, as well as self-references.
512        dependencies.retain(|dep| items_by_key.contains_key(dep) && *dep != key);
513
514        in_degree.insert(key, dependencies.len());
515
516        for dep in &dependencies {
517            dependents.entry(*dep).or_default().push(key);
518        }
519
520        if dependencies.is_empty() {
521            ready.push(key);
522        }
523    }
524
525    // Process items in topological order, pushing back into the input Vec.
526    while let Some(id) = ready.pop() {
527        let item = items_by_key.remove(&id).expect("must exist");
528        items.push(item);
529
530        if let Some(depts) = dependents.get(&id) {
531            for dept in depts {
532                let deg = in_degree.get_mut(dept).expect("must exist");
533                *deg -= 1;
534                if *deg == 0 {
535                    ready.push(*dept);
536                }
537            }
538        }
539    }
540
541    // Cycle detection: if we didn't process all items, there's a cycle.
542    if !items_by_key.is_empty() {
543        panic!("dependency cycle: {items_by_key:?}");
544    }
545}