Skip to main content

mz_adapter/
session.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Per-connection configuration parameters and state.
11
12#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use derivative::Derivative;
25use itertools::Itertools;
26use mz_adapter_types::connection::ConnectionId;
27use mz_auth::AuthenticatorKind;
28use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
29use mz_controller_types::ClusterId;
30use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
31use mz_ore::now::{EpochMillis, NowFn};
32use mz_pgwire_common::Format;
33use mz_repr::role_id::RoleId;
34use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
35use mz_repr::{CatalogItemId, Datum, Row, RowIterator, SqlScalarType, TimestampManipulation};
36use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
37use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
38use mz_sql::session::metadata::SessionMetadata;
39use mz_sql::session::user::{
40    INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
41};
42use mz_sql::session::vars::IsolationLevel;
43pub use mz_sql::session::vars::{
44    DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
45    SERVER_PATCH_VERSION, SessionVars, Var,
46};
47use mz_sql_parser::ast::TransactionIsolationLevel;
48use mz_storage_client::client::TableData;
49use mz_storage_types::sources::Timeline;
50use qcell::{QCell, QCellOwner};
51use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
52use tokio::sync::watch;
53use uuid::Uuid;
54
55use crate::catalog::CatalogState;
56use crate::client::RecordFirstRowStream;
57use crate::coord::appends::BuiltinTableAppendNotify;
58use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
59use crate::coord::peek::PeekResponseUnary;
60use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
61use crate::coord::{Coordinator, ExplainContext};
62use crate::error::AdapterError;
63use crate::metrics::{Metrics, SessionMetrics};
64use crate::statement_logging::PreparedStatementLoggingInfo;
65use crate::{AdapterNotice, ExecuteContext};
66use mz_catalog::durable::Snapshot;
67
68const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
69
70/// A session holds per-connection state.
71#[derive(Derivative)]
72#[derivative(Debug)]
73pub struct Session<T = mz_repr::Timestamp>
74where
75    T: Debug + Clone + Send + Sync,
76{
77    conn_id: ConnectionId,
78    /// A globally unique identifier for the session. Not to be confused
79    /// with `conn_id`, which may be reused.
80    uuid: Uuid,
81    prepared_statements: BTreeMap<String, PreparedStatement>,
82    portals: BTreeMap<String, Portal>,
83    transaction: TransactionStatus<T>,
84    pcx: Option<PlanContext>,
85    metrics: SessionMetrics,
86    #[derivative(Debug = "ignore")]
87    builtin_updates: Option<BuiltinTableAppendNotify>,
88
89    /// The role metadata of the current session.
90    ///
91    /// Invariant: role_metadata must be `Some` after the user has
92    /// successfully connected to and authenticated with Materialize.
93    ///
94    /// Prefer using this value over [`SessionConfig::user`].
95    //
96    // It would be better for this not to be an Option, but the
97    // `Session` is initialized before the user has connected to
98    // Materialize and is able to look up the `RoleMetadata`. The `Session`
99    // is also used to return an error when no role exists and
100    // therefore there is no valid `RoleMetadata`.
101    role_metadata: Option<RoleMetadata>,
102    client_ip: Option<IpAddr>,
103    vars: SessionVars,
104    notices_tx: mpsc::UnboundedSender<AdapterNotice>,
105    notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
106    next_transaction_id: TransactionId,
107    secret_key: u32,
108    external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
109    // Token allowing us to access `Arc<QCell<StatementLogging>>`
110    // metadata. We want these to be reference-counted, because the same
111    // statement might be referenced from multiple portals simultaneously.
112    //
113    // However, they can't be `Rc<RefCell<StatementLogging>>`, because
114    // the `Session` is sent around to different threads.
115    //
116    // On the other hand, they don't need to be
117    // `Arc<Mutex<StatementLogging>>`, because they will always be
118    // accessed from the same thread that the `Session` is currently
119    // on. We express this by gating access with this token.
120    #[derivative(Debug = "ignore")]
121    qcell_owner: QCellOwner,
122    session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
123    /// Incremented when session state that is relevant to prepared statement planning changes.
124    /// Currently, only changes to `portals` are tracked. Changes to `prepared_statements` don't
125    /// need to be tracked, because prepared statements can't depend on other prepared statements.
126    /// TODO: We might want to track changes also to session variables.
127    /// (`Catalog::transient_revision` similarly tracks changes on the catalog side.)
128    state_revision: u64,
129}
130
131impl<T> SessionMetadata for Session<T>
132where
133    T: Debug + Clone + Send + Sync,
134    T: TimestampManipulation,
135{
136    fn conn_id(&self) -> &ConnectionId {
137        &self.conn_id
138    }
139
140    fn client_ip(&self) -> Option<&IpAddr> {
141        self.client_ip.as_ref()
142    }
143
144    fn pcx(&self) -> &PlanContext {
145        &self
146            .transaction()
147            .inner()
148            .expect("no active transaction")
149            .pcx
150    }
151
152    fn role_metadata(&self) -> &RoleMetadata {
153        self.role_metadata
154            .as_ref()
155            .expect("role_metadata invariant violated")
156    }
157
158    fn vars(&self) -> &SessionVars {
159        &self.vars
160    }
161}
162
163/// Data structure suitable for passing to other threads that need access to some common Session
164/// properties.
165#[derive(Debug)]
166pub struct SessionMeta {
167    conn_id: ConnectionId,
168    client_ip: Option<IpAddr>,
169    pcx: PlanContext,
170    role_metadata: RoleMetadata,
171    vars: SessionVars,
172}
173
174impl SessionMetadata for SessionMeta {
175    fn vars(&self) -> &SessionVars {
176        &self.vars
177    }
178
179    fn conn_id(&self) -> &ConnectionId {
180        &self.conn_id
181    }
182
183    fn client_ip(&self) -> Option<&IpAddr> {
184        self.client_ip.as_ref()
185    }
186
187    fn pcx(&self) -> &PlanContext {
188        &self.pcx
189    }
190
191    fn role_metadata(&self) -> &RoleMetadata {
192        &self.role_metadata
193    }
194}
195
196/// Configures a new [`Session`].
197#[derive(Debug, Clone)]
198pub struct SessionConfig {
199    /// The connection ID for the session.
200    ///
201    /// May be reused after the session terminates.
202    pub conn_id: ConnectionId,
203    /// A universally unique identifier for the session, across all processes,
204    /// region, and all time.
205    ///
206    /// Must not be reused, even after the session terminates.
207    pub uuid: Uuid,
208    /// The peer address of the client
209    pub client_ip: Option<IpAddr>,
210    /// The name of the user associated with the session.
211    pub user: String,
212    /// An optional receiver that the session will periodically check for
213    /// updates to a user's external metadata.
214    pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
215    /// Helm chart version
216    pub helm_chart_version: Option<String>,
217    /// The authenticator that authenticated this user, if any.
218    pub authenticator_kind: AuthenticatorKind,
219}
220
221impl<T: TimestampManipulation> Session<T> {
222    /// Creates a new session for the specified connection ID.
223    pub(crate) fn new(
224        build_info: &'static BuildInfo,
225        config: SessionConfig,
226        metrics: SessionMetrics,
227    ) -> Session<T> {
228        assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
229        Self::new_internal(build_info, config, metrics)
230    }
231
232    /// Returns a reference-less collection of data usable by other tasks that don't have ownership
233    /// of the Session.
234    pub fn meta(&self) -> SessionMeta {
235        SessionMeta {
236            conn_id: self.conn_id().clone(),
237            client_ip: self.client_ip().copied(),
238            pcx: self.pcx().clone(),
239            role_metadata: self.role_metadata().clone(),
240            vars: self.vars.clone(),
241        }
242
243        // TODO: soft_assert that these are the same as Session.
244    }
245
246    /// Creates new statement logging metadata for a one-off
247    /// statement.
248    // Normally, such logging information would be created as part of
249    // allocating a new prepared statement, and a refcounted handle
250    // would be copied from that prepared statement to portals during
251    // binding. However, we also support (via `Command::declare`)
252    // binding a statement directly to a portal without creating an
253    // intermediate prepared statement. Thus, for those cases, a
254    // mechanism for generating the logging metadata directly is needed.
255    pub(crate) fn mint_logging<A: AstInfo>(
256        &self,
257        raw_sql: String,
258        stmt: Option<&Statement<A>>,
259        now: EpochMillis,
260    ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
261        Arc::new(QCell::new(
262            &self.qcell_owner,
263            PreparedStatementLoggingInfo::still_to_log(
264                raw_sql,
265                stmt,
266                now,
267                "".to_string(),
268                self.uuid,
269                false,
270            ),
271        ))
272    }
273
274    pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
275        self.qcell_owner.ro(&*cell)
276    }
277
278    pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
279        self.qcell_owner.rw(&*cell)
280    }
281
282    /// Returns a unique ID for the session.
283    /// Not to be confused with `connection_id`, which can be reused.
284    pub fn uuid(&self) -> Uuid {
285        self.uuid
286    }
287
288    /// Creates a new dummy session.
289    ///
290    /// Dummy sessions are intended for use when executing queries on behalf of
291    /// the system itself, rather than on behalf of a user.
292    pub fn dummy() -> Session<T> {
293        let registry = MetricsRegistry::new();
294        let metrics = Metrics::register_into(&registry);
295        let metrics = metrics.session_metrics();
296        let mut dummy = Self::new_internal(
297            &DUMMY_BUILD_INFO,
298            SessionConfig {
299                conn_id: DUMMY_CONNECTION_ID,
300                uuid: Uuid::new_v4(),
301                user: SYSTEM_USER.name.clone(),
302                client_ip: None,
303                external_metadata_rx: None,
304                helm_chart_version: None,
305                authenticator_kind: AuthenticatorKind::None,
306            },
307            metrics,
308        );
309        dummy.initialize_role_metadata(RoleId::User(0));
310        dummy
311    }
312
313    fn new_internal(
314        build_info: &'static BuildInfo,
315        SessionConfig {
316            conn_id,
317            uuid,
318            user,
319            client_ip,
320            mut external_metadata_rx,
321            helm_chart_version,
322            authenticator_kind,
323        }: SessionConfig,
324        metrics: SessionMetrics,
325    ) -> Session<T> {
326        let (notices_tx, notices_rx) = mpsc::unbounded_channel();
327        let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
328        let user = User {
329            name: user,
330            internal_metadata: None,
331            external_metadata: external_metadata_rx
332                .as_mut()
333                .map(|rx| rx.borrow_and_update().clone()),
334            authenticator_kind: Some(authenticator_kind),
335        };
336        let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
337        if let Some(default_cluster) = default_cluster {
338            vars.set_cluster(default_cluster.clone());
339        }
340        Session {
341            conn_id,
342            uuid,
343            transaction: TransactionStatus::Default,
344            pcx: None,
345            metrics,
346            builtin_updates: None,
347            prepared_statements: BTreeMap::new(),
348            portals: BTreeMap::new(),
349            role_metadata: None,
350            client_ip,
351            vars,
352            notices_tx,
353            notices_rx,
354            next_transaction_id: 0,
355            secret_key: rand::random(),
356            external_metadata_rx,
357            qcell_owner: QCellOwner::new(),
358            session_oracles: BTreeMap::new(),
359            state_revision: 0,
360        }
361    }
362
363    /// Returns the secret key associated with the session.
364    pub fn secret_key(&self) -> u32 {
365        self.secret_key
366    }
367
368    fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
369        if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
370            wall_time = *mock_time;
371        }
372        PlanContext::new(wall_time)
373    }
374
375    /// Starts an explicit transaction, or changes an implicit to an explicit
376    /// transaction.
377    pub fn start_transaction(
378        &mut self,
379        wall_time: DateTime<Utc>,
380        access: Option<TransactionAccessMode>,
381        isolation_level: Option<TransactionIsolationLevel>,
382    ) -> Result<(), AdapterError> {
383        // Check that current transaction state is compatible with new `access`
384        if let Some(txn) = self.transaction.inner() {
385            // `READ WRITE` prohibited if:
386            // - Currently in `READ ONLY`
387            // - Already performed a query
388            let read_write_prohibited = match txn.ops {
389                TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
390                    txn.access == Some(TransactionAccessMode::ReadOnly)
391                }
392                TransactionOps::None
393                | TransactionOps::Writes(_)
394                | TransactionOps::SingleStatement { .. }
395                | TransactionOps::DDL { .. } => false,
396            };
397
398            if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
399                return Err(AdapterError::ReadWriteUnavailable);
400            }
401        }
402
403        match std::mem::take(&mut self.transaction) {
404            TransactionStatus::Default => {
405                let id = self.next_transaction_id;
406                self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
407                self.transaction = TransactionStatus::InTransaction(Transaction {
408                    pcx: self.new_pcx(wall_time),
409                    ops: TransactionOps::None,
410                    write_lock_guards: None,
411                    access,
412                    id,
413                });
414            }
415            TransactionStatus::Started(mut txn)
416            | TransactionStatus::InTransactionImplicit(mut txn)
417            | TransactionStatus::InTransaction(mut txn) => {
418                if access.is_some() {
419                    txn.access = access;
420                }
421                self.transaction = TransactionStatus::InTransaction(txn);
422            }
423            TransactionStatus::Failed(_) => unreachable!(),
424        };
425
426        if let Some(isolation_level) = isolation_level {
427            self.vars
428                .set_local_transaction_isolation(isolation_level.into());
429        }
430
431        Ok(())
432    }
433
434    /// Starts either a single statement or implicit transaction based on the
435    /// number of statements, but only if no transaction has been started already.
436    pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
437        if let TransactionStatus::Default = self.transaction {
438            let id = self.next_transaction_id;
439            self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
440            let txn = Transaction {
441                pcx: self.new_pcx(wall_time),
442                ops: TransactionOps::None,
443                write_lock_guards: None,
444                access: None,
445                id,
446            };
447            match stmts {
448                1 => self.transaction = TransactionStatus::Started(txn),
449                n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
450                _ => {}
451            }
452        }
453    }
454
455    /// Starts a single statement transaction, but only if no transaction has been started already.
456    pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
457        self.start_transaction_implicit(wall_time, 1);
458    }
459
460    /// Clears a transaction, setting its state to Default and destroying all
461    /// portals. Returned are:
462    /// - sinks that were started in this transaction and need to be dropped
463    /// - the cleared transaction so its operations can be handled
464    ///
465    /// The [Postgres protocol docs](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) specify:
466    /// > a named portal object lasts till the end of the current transaction
467    /// and
468    /// > An unnamed portal is destroyed at the end of the transaction
469    #[must_use]
470    pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
471        self.portals.clear();
472        self.pcx = None;
473        self.state_revision += 1;
474        mem::take(&mut self.transaction)
475    }
476
477    /// Marks the current transaction as failed.
478    pub fn fail_transaction(mut self) -> Self {
479        match self.transaction {
480            TransactionStatus::Default => unreachable!(),
481            TransactionStatus::Started(txn)
482            | TransactionStatus::InTransactionImplicit(txn)
483            | TransactionStatus::InTransaction(txn) => {
484                self.transaction = TransactionStatus::Failed(txn);
485            }
486            TransactionStatus::Failed(_) => {}
487        };
488        self
489    }
490
491    /// Returns the current transaction status.
492    pub fn transaction(&self) -> &TransactionStatus<T> {
493        &self.transaction
494    }
495
496    /// Returns the current transaction status.
497    pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
498        &mut self.transaction
499    }
500
501    /// Returns the session's transaction code.
502    pub fn transaction_code(&self) -> TransactionCode {
503        self.transaction().into()
504    }
505
506    /// Adds operations to the current transaction. An error is produced if
507    /// they cannot be merged (i.e., a timestamp-dependent read cannot be
508    /// merged to an insert).
509    pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
510        self.transaction.add_ops(add_ops)
511    }
512
513    /// Returns a channel on which to send notices to the session.
514    pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
515        self.notices_tx.clone()
516    }
517
518    /// Adds a notice to the session.
519    pub fn add_notice(&self, notice: AdapterNotice) {
520        self.add_notices([notice])
521    }
522
523    /// Adds multiple notices to the session.
524    pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
525        for notice in notices {
526            let _ = self.notices_tx.send(notice);
527        }
528    }
529
530    /// Awaits a possible notice.
531    ///
532    /// This method is cancel safe.
533    pub async fn recv_notice(&mut self) -> AdapterNotice {
534        // This method is cancel safe because recv is cancel safe.
535        loop {
536            let notice = self
537                .notices_rx
538                .recv()
539                .await
540                .expect("Session also holds a sender, so recv won't ever return None");
541            match self.notice_filter(notice) {
542                Some(notice) => return notice,
543                None => continue,
544            }
545        }
546    }
547
548    /// Returns a draining iterator over the notices attached to the session.
549    pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
550        let mut notices = Vec::new();
551        while let Ok(notice) = self.notices_rx.try_recv() {
552            if let Some(notice) = self.notice_filter(notice) {
553                notices.push(notice);
554            }
555        }
556        notices
557    }
558
559    /// Returns Some if the notice should be reported, otherwise None.
560    fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
561        // Filter out low threshold severity.
562        let minimum_client_severity = self.vars.client_min_messages();
563        let sev = notice.severity();
564        if !minimum_client_severity.should_output_to_client(&sev) {
565            return None;
566        }
567        // Filter out notices for other clusters.
568        if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = &notice {
569            if cluster != self.vars.cluster() {
570                return None;
571            }
572        }
573        Some(notice)
574    }
575
576    /// Sets the transaction ops to `TransactionOps::None`. Must only be used after
577    /// verifying that no transaction anomalies will occur if cleared.
578    pub fn clear_transaction_ops(&mut self) {
579        if let Some(txn) = self.transaction.inner_mut() {
580            txn.ops = TransactionOps::None;
581        }
582    }
583
584    /// If the current transaction ops belong to a read, then sets the
585    /// ops to `None`, returning the old read timestamp context if
586    /// any existed. Must only be used after verifying that no transaction
587    /// anomalies will occur if cleared.
588    pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
589        if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
590            if let TransactionOps::Peeks { .. } = ops {
591                let ops = std::mem::take(ops);
592                Some(
593                    ops.timestamp_determination()
594                        .expect("checked above")
595                        .timestamp_context,
596                )
597            } else {
598                None
599            }
600        } else {
601            None
602        }
603    }
604
605    /// Returns the transaction's read timestamp determination, if set.
606    ///
607    /// Returns `None` if there is no active transaction, or if the active
608    /// transaction is not a read transaction.
609    pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
610        match self.transaction.inner() {
611            Some(Transaction {
612                pcx: _,
613                ops: TransactionOps::Peeks { determination, .. },
614                write_lock_guards: _,
615                access: _,
616                id: _,
617            }) => Some(determination.clone()),
618            _ => None,
619        }
620    }
621
622    /// Whether this session has a timestamp for a read transaction.
623    pub fn contains_read_timestamp(&self) -> bool {
624        matches!(
625            self.transaction.inner(),
626            Some(Transaction {
627                pcx: _,
628                ops: TransactionOps::Peeks {
629                    determination: TimestampDetermination {
630                        timestamp_context: TimestampContext::TimelineTimestamp { .. },
631                        ..
632                    },
633                    ..
634                },
635                write_lock_guards: _,
636                access: _,
637                id: _,
638            })
639        )
640    }
641
642    /// Registers the prepared statement under `name`.
643    pub fn set_prepared_statement(
644        &mut self,
645        name: String,
646        stmt: Option<Statement<Raw>>,
647        raw_sql: String,
648        desc: StatementDesc,
649        state_revision: StateRevision,
650        now: EpochMillis,
651    ) {
652        let logging = PreparedStatementLoggingInfo::still_to_log(
653            raw_sql,
654            stmt.as_ref(),
655            now,
656            name.clone(),
657            self.uuid,
658            false,
659        );
660        let statement = PreparedStatement {
661            stmt,
662            desc,
663            state_revision,
664            logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
665        };
666        self.prepared_statements.insert(name, statement);
667    }
668
669    /// Removes the prepared statement associated with `name`.
670    ///
671    /// Returns whether a statement previously existed.
672    pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
673        self.prepared_statements.remove(name).is_some()
674    }
675
676    /// Removes all prepared statements.
677    pub fn remove_all_prepared_statements(&mut self) {
678        self.prepared_statements.clear();
679    }
680
681    /// Retrieves the prepared statement associated with `name`.
682    ///
683    /// This is unverified and could be incorrect if the underlying catalog has
684    /// changed.
685    pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
686        self.prepared_statements.get(name)
687    }
688
689    /// Retrieves the prepared statement associated with `name`.
690    ///
691    /// This is unverified and could be incorrect if the underlying catalog has
692    /// changed.
693    pub fn get_prepared_statement_mut_unverified(
694        &mut self,
695        name: &str,
696    ) -> Option<&mut PreparedStatement> {
697        self.prepared_statements.get_mut(name)
698    }
699
700    /// Returns the prepared statements for the session.
701    pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
702        &self.prepared_statements
703    }
704
705    /// Returns the portals for the session.
706    pub fn portals(&self) -> &BTreeMap<String, Portal> {
707        &self.portals
708    }
709
710    /// Binds the specified portal to the specified prepared statement.
711    ///
712    /// If the prepared statement contains parameters, the values and types of
713    /// those parameters must be provided in `params`. It is the caller's
714    /// responsibility to ensure that the correct number of parameters is
715    /// provided.
716    ///
717    /// The `results_formats` parameter sets the desired format of the results,
718    /// and is stored on the portal.
719    pub fn set_portal(
720        &mut self,
721        portal_name: String,
722        desc: StatementDesc,
723        stmt: Option<Statement<Raw>>,
724        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
725        params: Vec<(Datum, SqlScalarType)>,
726        result_formats: Vec<Format>,
727        state_revision: StateRevision,
728    ) -> Result<(), AdapterError> {
729        // The empty portal can be silently replaced.
730        if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
731            return Err(AdapterError::DuplicateCursor(portal_name));
732        }
733        self.state_revision += 1;
734        let param_types = desc.param_types.clone();
735        self.portals.insert(
736            portal_name,
737            Portal {
738                stmt: stmt.map(Arc::new),
739                desc,
740                state_revision,
741                parameters: Params {
742                    datums: Row::pack(params.iter().map(|(d, _t)| d)),
743                    execute_types: params.into_iter().map(|(_d, t)| t).collect(),
744                    expected_types: param_types,
745                },
746                result_formats,
747                state: PortalState::NotStarted,
748                logging,
749                lifecycle_timestamps: None,
750            },
751        );
752        Ok(())
753    }
754
755    /// Removes the specified portal.
756    ///
757    /// If there is no such portal, this method does nothing. Returns whether that portal existed.
758    pub fn remove_portal(&mut self, portal_name: &str) -> bool {
759        self.state_revision += 1;
760        self.portals.remove(portal_name).is_some()
761    }
762
763    /// Retrieves a reference to the specified portal.
764    ///
765    /// If there is no such portal, returns `None`.
766    pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
767        self.portals.get(portal_name)
768    }
769
770    /// Retrieves a mutable reference to the specified portal.
771    ///
772    /// If there is no such portal, returns `None`.
773    ///
774    /// Note: When using the returned `PortalRefMut`, there is no need to increment
775    /// `Session::state_revision`, because the portal's meaning is not changed.
776    pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
777        self.portals.get_mut(portal_name).map(|p| PortalRefMut {
778            stmt: &p.stmt,
779            desc: &p.desc,
780            state_revision: &mut p.state_revision,
781            parameters: &mut p.parameters,
782            result_formats: &mut p.result_formats,
783            logging: &mut p.logging,
784            state: &mut p.state,
785            lifecycle_timestamps: &mut p.lifecycle_timestamps,
786        })
787    }
788
789    /// Creates and installs a new portal.
790    pub fn create_new_portal(
791        &mut self,
792        stmt: Option<Statement<Raw>>,
793        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
794        desc: StatementDesc,
795        parameters: Params,
796        result_formats: Vec<Format>,
797        state_revision: StateRevision,
798    ) -> Result<String, AdapterError> {
799        self.state_revision += 1;
800
801        // See: https://github.com/postgres/postgres/blob/84f5c2908dad81e8622b0406beea580e40bb03ac/src/backend/utils/mmgr/portalmem.c#L234
802        for i in 0usize.. {
803            let name = format!("<unnamed portal {}>", i);
804            match self.portals.entry(name.clone()) {
805                Entry::Occupied(_) => continue,
806                Entry::Vacant(entry) => {
807                    entry.insert(Portal {
808                        stmt: stmt.map(Arc::new),
809                        desc,
810                        state_revision,
811                        parameters,
812                        result_formats,
813                        state: PortalState::NotStarted,
814                        logging,
815                        lifecycle_timestamps: None,
816                    });
817                    return Ok(name);
818                }
819            }
820        }
821
822        coord_bail!("unable to create a new portal");
823    }
824
825    /// Resets the session to its initial state. Returns sinks that need to be
826    /// dropped.
827    pub fn reset(&mut self) {
828        let _ = self.clear_transaction();
829        self.prepared_statements.clear();
830        self.vars.reset_all();
831    }
832
833    /// Returns the [application_name] that created this session.
834    ///
835    /// [application_name]: (https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-APPLICATION-NAME)
836    pub fn application_name(&self) -> &str {
837        self.vars.application_name()
838    }
839
840    /// Returns a reference to the variables in this session.
841    pub fn vars(&self) -> &SessionVars {
842        &self.vars
843    }
844
845    /// Returns a mutable reference to the variables in this session.
846    pub fn vars_mut(&mut self) -> &mut SessionVars {
847        &mut self.vars
848    }
849
850    /// Grants a set of write locks to this session's inner [`Transaction`].
851    ///
852    /// # Panics
853    /// If the inner transaction is idle. See [`TransactionStatus::try_grant_write_locks`].
854    ///
855    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
856        self.transaction.try_grant_write_locks(guards)
857    }
858
859    /// Drains any external metadata updates and applies the changes from the latest update.
860    pub fn apply_external_metadata_updates(&mut self) {
861        // If no sender is registered then there isn't anything to do.
862        let Some(rx) = &mut self.external_metadata_rx else {
863            return;
864        };
865
866        // If the value hasn't changed then return.
867        if !rx.has_changed().unwrap_or(false) {
868            return;
869        }
870
871        // Update our metadata! Note the short critical section (just a clone) to avoid blocking
872        // the sending side of this watch channel.
873        let metadata = rx.borrow_and_update().clone();
874        self.vars.set_external_user_metadata(metadata);
875    }
876
877    /// Applies the internal user metadata to the session.
878    pub fn apply_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
879        self.vars.set_internal_user_metadata(metadata);
880    }
881
882    /// Initializes the session's role metadata.
883    pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
884        self.role_metadata = Some(RoleMetadata::new(role_id));
885    }
886
887    /// Ensures that a timestamp oracle exists for `timeline` and returns a mutable reference to
888    /// the timestamp oracle.
889    pub fn ensure_timestamp_oracle(
890        &mut self,
891        timeline: Timeline,
892    ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
893        self.session_oracles
894            .entry(timeline)
895            .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
896    }
897
898    /// Ensures that a timestamp oracle exists for reads and writes from/to a local input and
899    /// returns a mutable reference to the timestamp oracle.
900    pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
901        self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
902    }
903
904    /// Returns a reference to the timestamp oracle for `timeline`.
905    pub fn get_timestamp_oracle(
906        &self,
907        timeline: &Timeline,
908    ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
909        self.session_oracles.get(timeline)
910    }
911
912    /// If the current session is using the Strong Session Serializable isolation level advance the
913    /// session local timestamp oracle to `write_ts`.
914    pub fn apply_write(&mut self, timestamp: T) {
915        if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
916            self.ensure_local_timestamp_oracle().apply_write(timestamp);
917        }
918    }
919
920    /// Returns the [`SessionMetrics`] instance associated with this [`Session`].
921    pub fn metrics(&self) -> &SessionMetrics {
922        &self.metrics
923    }
924
925    /// Sets the `BuiltinTableAppendNotify` for this session.
926    pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
927        let prev = self.builtin_updates.replace(fut);
928        mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
929    }
930
931    /// Takes the stashed `BuiltinTableAppendNotify`, if one exists, and returns a [`Future`] that
932    /// waits for the writes to complete.
933    pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
934        if let Some(fut) = self.builtin_updates.take() {
935            // Record how long we blocked for, if we blocked at all.
936            let histogram = self
937                .metrics()
938                .session_startup_table_writes_seconds()
939                .clone();
940            Some(async move {
941                fut.wall_time().observe(histogram).await;
942            })
943        } else {
944            None
945        }
946    }
947
948    /// Return the state_revision of the session, which can be used by dependent objects for knowing
949    /// when to re-plan due to session state changes.
950    pub fn state_revision(&self) -> u64 {
951        self.state_revision
952    }
953}
954
955/// A prepared statement.
956#[derive(Derivative, Clone)]
957#[derivative(Debug)]
958pub struct PreparedStatement {
959    stmt: Option<Statement<Raw>>,
960    desc: StatementDesc,
961    /// The most recent state revision that has verified this statement.
962    pub state_revision: StateRevision,
963    #[derivative(Debug = "ignore")]
964    logging: Arc<QCell<PreparedStatementLoggingInfo>>,
965}
966
967impl PreparedStatement {
968    /// Returns the AST associated with this prepared statement,
969    /// if the prepared statement was not the empty query.
970    pub fn stmt(&self) -> Option<&Statement<Raw>> {
971        self.stmt.as_ref()
972    }
973
974    /// Returns the description of the prepared statement.
975    pub fn desc(&self) -> &StatementDesc {
976        &self.desc
977    }
978
979    /// Returns a handle to the metadata for statement logging.
980    pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
981        &self.logging
982    }
983}
984
985/// A portal represents the execution state of a running or runnable query.
986#[derive(Derivative)]
987#[derivative(Debug)]
988pub struct Portal {
989    /// The statement that is bound to this portal.
990    pub stmt: Option<Arc<Statement<Raw>>>,
991    /// The statement description.
992    pub desc: StatementDesc,
993    /// The most recent state revision that has verified this portal.
994    pub state_revision: StateRevision,
995    /// The bound values for the parameters in the prepared statement, if any.
996    pub parameters: Params,
997    /// The desired output format for each column in the result set.
998    pub result_formats: Vec<Format>,
999    /// A handle to metadata needed for statement logging.
1000    #[derivative(Debug = "ignore")]
1001    pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
1002    /// The execution state of the portal.
1003    #[derivative(Debug = "ignore")]
1004    pub state: PortalState,
1005    /// Statement lifecycle timestamps coming from `mz-pgwire`.
1006    pub lifecycle_timestamps: Option<LifecycleTimestamps>,
1007}
1008
1009/// A mutable reference to a portal, capturing its state and associated metadata. Importantly, it
1010/// does _not_ give _mutable_ access to `stmt` and `desc`, which means that you do not need to
1011/// increment `Session::state_revision` when modifying fields through a `PortalRefMut`, because the
1012/// portal's meaning is not changed.
1013pub struct PortalRefMut<'a> {
1014    /// The statement that is bound to this portal.
1015    pub stmt: &'a Option<Arc<Statement<Raw>>>,
1016    /// The statement description.
1017    pub desc: &'a StatementDesc,
1018    /// The most recent state revision that has verified this portal.
1019    pub state_revision: &'a mut StateRevision,
1020    /// The bound values for the parameters in the prepared statement, if any.
1021    pub parameters: &'a mut Params,
1022    /// The desired output format for each column in the result set.
1023    pub result_formats: &'a mut Vec<Format>,
1024    /// A handle to metadata needed for statement logging.
1025    pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1026    /// The execution state of the portal.
1027    pub state: &'a mut PortalState,
1028    /// Statement lifecycle timestamps coming from `mz-pgwire`.
1029    pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1030}
1031
1032/// Points to a revision of catalog state and session state. When the current revisions are not the
1033/// same as the revisions when a prepared statement or a portal was described, we need to check
1034/// whether the description is still valid.
1035#[derive(Debug, Clone, Copy, PartialEq)]
1036pub struct StateRevision {
1037    /// A revision of the catalog.
1038    pub catalog_revision: u64,
1039    /// A revision of the session state.
1040    pub session_state_revision: u64,
1041}
1042
1043/// Execution states of a portal.
1044pub enum PortalState {
1045    /// Portal not yet started.
1046    NotStarted,
1047    /// Portal is a rows-returning statement in progress with 0 or more rows
1048    /// remaining.
1049    InProgress(Option<InProgressRows>),
1050    /// Portal has completed and should not be re-executed. If the optional string
1051    /// is present, it is returned as a CommandComplete tag, otherwise an error
1052    /// is sent.
1053    Completed(Option<String>),
1054}
1055
1056/// State of an in-progress, rows-returning portal.
1057pub struct InProgressRows {
1058    /// The current batch of rows.
1059    pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1060    /// A stream from which to fetch more row batches.
1061    pub remaining: RecordFirstRowStream,
1062}
1063
1064impl InProgressRows {
1065    /// Creates a new InProgressRows from a batch stream.
1066    pub fn new(remaining: RecordFirstRowStream) -> Self {
1067        Self {
1068            current: None,
1069            remaining,
1070        }
1071    }
1072
1073    /// Determines whether the underlying stream has ended and there are also no more rows
1074    /// stashed in `current`.
1075    pub fn no_more_rows(&self) -> bool {
1076        self.remaining.no_more_rows && self.current.is_none()
1077    }
1078}
1079
1080/// A channel of batched rows.
1081pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1082
1083/// Part of statement lifecycle. These are timestamps that come from the Adapter frontend
1084/// (`mz-pgwire`) part of the lifecycle.
1085#[derive(Debug, Clone)]
1086pub struct LifecycleTimestamps {
1087    /// When the query was received. More specifically, when the tokio recv returned.
1088    /// For a Simple Query, this is for the whole query, for the Extended Query flow, this is only
1089    /// for `FrontendMessage::Execute`. (This means that this is after parsing for the
1090    /// Extended Query flow.)
1091    pub received: EpochMillis,
1092}
1093
1094impl LifecycleTimestamps {
1095    /// Creates a new `LifecycleTimestamps`.
1096    pub fn new(received: EpochMillis) -> Self {
1097        Self { received }
1098    }
1099}
1100
1101/// The transaction status of a session.
1102///
1103/// PostgreSQL's transaction states are in backend/access/transam/xact.c.
1104#[derive(Debug)]
1105pub enum TransactionStatus<T> {
1106    /// Idle. Matches `TBLOCK_DEFAULT`.
1107    Default,
1108    /// Running a single-query transaction. Matches
1109    /// `TBLOCK_STARTED`. In PostgreSQL, when using the extended query protocol, this
1110    /// may be upgraded into multi-statement implicit query (see [`Self::InTransactionImplicit`]).
1111    /// Additionally, some statements may trigger an eager commit of the implicit transaction,
1112    /// see: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>. In
1113    /// Materialize however, we eagerly commit all statements outside of an explicit transaction
1114    /// when using the extended query protocol. Therefore, we can guarantee that this state will
1115    /// always be a single-query transaction and never be upgraded into a multi-statement implicit
1116    /// query.
1117    Started(Transaction<T>),
1118    /// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
1119    InTransaction(Transaction<T>),
1120    /// Currently in an implicit transaction started from a multi-statement query
1121    /// with more than 1 statements. Matches `TBLOCK_IMPLICIT_INPROGRESS`.
1122    InTransactionImplicit(Transaction<T>),
1123    /// In a failed transaction. Matches `TBLOCK_ABORT`.
1124    Failed(Transaction<T>),
1125}
1126
1127impl<T: TimestampManipulation> TransactionStatus<T> {
1128    /// Extracts the inner transaction ops and write lock guard if not failed.
1129    pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1130        match self {
1131            TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1132            TransactionStatus::Started(txn)
1133            | TransactionStatus::InTransaction(txn)
1134            | TransactionStatus::InTransactionImplicit(txn) => {
1135                (Some(txn.ops), txn.write_lock_guards)
1136            }
1137        }
1138    }
1139
1140    /// Exposes the inner transaction.
1141    pub fn inner(&self) -> Option<&Transaction<T>> {
1142        match self {
1143            TransactionStatus::Default => None,
1144            TransactionStatus::Started(txn)
1145            | TransactionStatus::InTransaction(txn)
1146            | TransactionStatus::InTransactionImplicit(txn)
1147            | TransactionStatus::Failed(txn) => Some(txn),
1148        }
1149    }
1150
1151    /// Exposes the inner transaction.
1152    pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1153        match self {
1154            TransactionStatus::Default => None,
1155            TransactionStatus::Started(txn)
1156            | TransactionStatus::InTransaction(txn)
1157            | TransactionStatus::InTransactionImplicit(txn)
1158            | TransactionStatus::Failed(txn) => Some(txn),
1159        }
1160    }
1161
1162    /// Whether the transaction's ops are DDL.
1163    pub fn is_ddl(&self) -> bool {
1164        match self {
1165            TransactionStatus::Default => false,
1166            TransactionStatus::Started(txn)
1167            | TransactionStatus::InTransaction(txn)
1168            | TransactionStatus::InTransactionImplicit(txn)
1169            | TransactionStatus::Failed(txn) => {
1170                matches!(txn.ops, TransactionOps::DDL { .. })
1171            }
1172        }
1173    }
1174
1175    /// Expresses whether or not the transaction was implicitly started.
1176    /// However, its negation does not imply explicitly started.
1177    pub fn is_implicit(&self) -> bool {
1178        match self {
1179            TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1180            TransactionStatus::Default
1181            | TransactionStatus::InTransaction(_)
1182            | TransactionStatus::Failed(_) => false,
1183        }
1184    }
1185
1186    /// Whether the transaction may contain multiple statements.
1187    pub fn is_in_multi_statement_transaction(&self) -> bool {
1188        match self {
1189            TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1190                true
1191            }
1192            TransactionStatus::Default
1193            | TransactionStatus::Started(_)
1194            | TransactionStatus::Failed(_) => false,
1195        }
1196    }
1197
1198    /// Whether we are in a multi-statement transaction, AND the query is immediate.
1199    pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1200        self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1201    }
1202
1203    /// Grants the writes lock to the inner transaction, returning an error if the transaction
1204    /// has already been granted write locks.
1205    ///
1206    /// # Panics
1207    /// If `self` is `TransactionStatus::Default`, which indicates that the
1208    /// transaction is idle, which is not appropriate to assign the
1209    /// coordinator's write lock to.
1210    ///
1211    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1212        match self {
1213            TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1214            TransactionStatus::Started(txn)
1215            | TransactionStatus::InTransaction(txn)
1216            | TransactionStatus::InTransactionImplicit(txn)
1217            | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1218        }
1219    }
1220
1221    /// Returns the currently held [`WriteLocks`], if this transaction holds any.
1222    pub fn write_locks(&self) -> Option<&WriteLocks> {
1223        match self {
1224            TransactionStatus::Default => None,
1225            TransactionStatus::Started(txn)
1226            | TransactionStatus::InTransaction(txn)
1227            | TransactionStatus::InTransactionImplicit(txn)
1228            | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1229        }
1230    }
1231
1232    /// The timeline of the transaction, if one exists.
1233    pub fn timeline(&self) -> Option<Timeline> {
1234        match self {
1235            TransactionStatus::Default => None,
1236            TransactionStatus::Started(txn)
1237            | TransactionStatus::InTransaction(txn)
1238            | TransactionStatus::InTransactionImplicit(txn)
1239            | TransactionStatus::Failed(txn) => txn.timeline(),
1240        }
1241    }
1242
1243    /// The cluster of the transaction, if one exists.
1244    pub fn cluster(&self) -> Option<ClusterId> {
1245        match self {
1246            TransactionStatus::Default => None,
1247            TransactionStatus::Started(txn)
1248            | TransactionStatus::InTransaction(txn)
1249            | TransactionStatus::InTransactionImplicit(txn)
1250            | TransactionStatus::Failed(txn) => txn.cluster(),
1251        }
1252    }
1253
1254    /// Snapshot of the catalog that reflects DDL operations run in this transaction.
1255    pub fn catalog_state(&self) -> Option<&CatalogState> {
1256        match self.inner() {
1257            Some(Transaction {
1258                ops: TransactionOps::DDL { state, .. },
1259                ..
1260            }) => Some(state),
1261            _ => None,
1262        }
1263    }
1264
1265    /// Reports whether any operations have been executed as part of this transaction
1266    pub fn contains_ops(&self) -> bool {
1267        match self.inner() {
1268            Some(txn) => txn.contains_ops(),
1269            None => false,
1270        }
1271    }
1272
1273    /// Checks whether the current state of this transaction allows writes
1274    /// (adding write ops).
1275    /// transaction
1276    pub fn allows_writes(&self) -> bool {
1277        match self {
1278            TransactionStatus::Started(Transaction { ops, access, .. })
1279            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1280            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1281                match ops {
1282                    TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1283                    TransactionOps::Peeks { determination, .. } => {
1284                        // If-and-only-if peeks thus far do not have a timestamp
1285                        // (i.e. they are constant), we can switch to a write
1286                        // transaction.
1287                        !determination.timestamp_context.contains_timestamp()
1288                    }
1289                    TransactionOps::Subscribe => false,
1290                    TransactionOps::Writes(_) => true,
1291                    TransactionOps::SingleStatement { .. } => false,
1292                    TransactionOps::DDL { .. } => false,
1293                }
1294            }
1295            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1296                unreachable!()
1297            }
1298        }
1299    }
1300
1301    /// Adds operations to the current transaction. An error is produced if they cannot be merged
1302    /// (i.e., a timestamp-dependent read cannot be merged to an insert).
1303    ///
1304    /// The `DDL` variant is an exception and does not merge operations, but instead overwrites the
1305    /// old ops with the new ops. This is correct because it is only used in conjunction with the
1306    /// Dry Run catalog op which returns an error containing all of the ops, and those ops are
1307    /// passed to this function which then overwrites.
1308    ///
1309    /// # Panics
1310    /// If the operations are compatible but the operation metadata doesn't match. Such as reads at
1311    /// different timestamps, reads on different timelines, reads on different clusters, etc. It's
1312    /// up to the caller to make sure these are aligned.
1313    pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1314        match self {
1315            TransactionStatus::Started(Transaction { ops, access, .. })
1316            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1317            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1318                match ops {
1319                    TransactionOps::None => {
1320                        if matches!(access, Some(TransactionAccessMode::ReadOnly))
1321                            && matches!(add_ops, TransactionOps::Writes(_))
1322                        {
1323                            return Err(AdapterError::ReadOnlyTransaction);
1324                        }
1325                        *ops = add_ops;
1326                    }
1327                    TransactionOps::Peeks {
1328                        determination,
1329                        cluster_id,
1330                        requires_linearization,
1331                    } => match add_ops {
1332                        TransactionOps::Peeks {
1333                            determination: add_timestamp_determination,
1334                            cluster_id: add_cluster_id,
1335                            requires_linearization: add_requires_linearization,
1336                        } => {
1337                            assert_eq!(*cluster_id, add_cluster_id);
1338                            match (
1339                                &determination.timestamp_context,
1340                                &add_timestamp_determination.timestamp_context,
1341                            ) {
1342                                (
1343                                    TimestampContext::TimelineTimestamp {
1344                                        timeline: txn_timeline,
1345                                        chosen_ts: txn_ts,
1346                                        oracle_ts: _,
1347                                    },
1348                                    TimestampContext::TimelineTimestamp {
1349                                        timeline: add_timeline,
1350                                        chosen_ts: add_ts,
1351                                        oracle_ts: _,
1352                                    },
1353                                ) => {
1354                                    assert_eq!(txn_timeline, add_timeline);
1355                                    assert_eq!(txn_ts, add_ts);
1356                                }
1357                                (TimestampContext::NoTimestamp, _) => {
1358                                    *determination = add_timestamp_determination
1359                                }
1360                                (_, TimestampContext::NoTimestamp) => {}
1361                            };
1362                            if matches!(requires_linearization, RequireLinearization::NotRequired)
1363                                && matches!(
1364                                    add_requires_linearization,
1365                                    RequireLinearization::Required
1366                                )
1367                            {
1368                                *requires_linearization = add_requires_linearization;
1369                            }
1370                        }
1371                        // Iff peeks thus far do not have a timestamp (i.e.
1372                        // they are constant), we can switch to a write
1373                        // transaction.
1374                        writes @ TransactionOps::Writes(..)
1375                            if !determination.timestamp_context.contains_timestamp() =>
1376                        {
1377                            *ops = writes;
1378                        }
1379                        _ => return Err(AdapterError::ReadOnlyTransaction),
1380                    },
1381                    TransactionOps::Subscribe => {
1382                        return Err(AdapterError::SubscribeOnlyTransaction);
1383                    }
1384                    TransactionOps::Writes(txn_writes) => match add_ops {
1385                        TransactionOps::Writes(mut add_writes) => {
1386                            // We should have already checked the access above, but make sure we don't miss
1387                            // it anyway.
1388                            assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1389                            txn_writes.append(&mut add_writes);
1390                        }
1391                        // Iff peeks do not have a timestamp (i.e. they are
1392                        // constant), we can permit them.
1393                        TransactionOps::Peeks { determination, .. }
1394                            if !determination.timestamp_context.contains_timestamp() => {}
1395                        _ => {
1396                            return Err(AdapterError::WriteOnlyTransaction);
1397                        }
1398                    },
1399                    TransactionOps::SingleStatement { .. } => {
1400                        return Err(AdapterError::SingleStatementTransaction);
1401                    }
1402                    TransactionOps::DDL {
1403                        ops: og_ops,
1404                        revision: og_revision,
1405                        state: og_state,
1406                        side_effects,
1407                        snapshot: og_snapshot,
1408                    } => match add_ops {
1409                        TransactionOps::DDL {
1410                            ops: new_ops,
1411                            revision: new_revision,
1412                            side_effects: mut net_new_side_effects,
1413                            state: new_state,
1414                            snapshot: new_snapshot,
1415                        } => {
1416                            if *og_revision != new_revision {
1417                                return Err(AdapterError::DDLTransactionRace);
1418                            }
1419                            // The old og_ops are overwritten, not extended.
1420                            if !new_ops.is_empty() {
1421                                *og_ops = new_ops;
1422                                *og_state = new_state;
1423                                *og_snapshot = new_snapshot;
1424                            }
1425                            side_effects.append(&mut net_new_side_effects);
1426                        }
1427                        _ => return Err(AdapterError::DDLOnlyTransaction),
1428                    },
1429                }
1430            }
1431            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1432                unreachable!()
1433            }
1434        }
1435        Ok(())
1436    }
1437}
1438
1439/// An abstraction allowing us to identify different transactions.
1440pub type TransactionId = u64;
1441
1442impl<T> Default for TransactionStatus<T> {
1443    fn default() -> Self {
1444        TransactionStatus::Default
1445    }
1446}
1447
1448/// State data for transactions.
1449#[derive(Debug)]
1450pub struct Transaction<T> {
1451    /// Plan context.
1452    pub pcx: PlanContext,
1453    /// Transaction operations.
1454    pub ops: TransactionOps<T>,
1455    /// Uniquely identifies the transaction on a per connection basis.
1456    /// Two transactions started from separate connections may share the
1457    /// same ID.
1458    /// If all IDs have been exhausted, this will wrap around back to 0.
1459    pub id: TransactionId,
1460    /// Locks for objects this transaction will operate on.
1461    write_lock_guards: Option<WriteLocks>,
1462    /// Access mode (read only, read write).
1463    access: Option<TransactionAccessMode>,
1464}
1465
1466impl<T> Transaction<T> {
1467    /// Tries to grant the write lock to this transaction for the remainder of its lifetime. Errors
1468    /// if this [`Transaction`] has already been granted write locks.
1469    fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1470        match &mut self.write_lock_guards {
1471            Some(existing) => Err(existing),
1472            locks @ None => {
1473                *locks = Some(guards);
1474                Ok(())
1475            }
1476        }
1477    }
1478
1479    /// The timeline of the transaction, if one exists.
1480    fn timeline(&self) -> Option<Timeline> {
1481        match &self.ops {
1482            TransactionOps::Peeks {
1483                determination:
1484                    TimestampDetermination {
1485                        timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1486                        ..
1487                    },
1488                ..
1489            } => Some(timeline.clone()),
1490            TransactionOps::Peeks { .. }
1491            | TransactionOps::None
1492            | TransactionOps::Subscribe
1493            | TransactionOps::Writes(_)
1494            | TransactionOps::SingleStatement { .. }
1495            | TransactionOps::DDL { .. } => None,
1496        }
1497    }
1498
1499    /// The cluster of the transaction, if one exists.
1500    pub fn cluster(&self) -> Option<ClusterId> {
1501        match &self.ops {
1502            TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1503            TransactionOps::None
1504            | TransactionOps::Subscribe
1505            | TransactionOps::Writes(_)
1506            | TransactionOps::SingleStatement { .. }
1507            | TransactionOps::DDL { .. } => None,
1508        }
1509    }
1510
1511    /// Reports whether any operations have been executed as part of this transaction
1512    fn contains_ops(&self) -> bool {
1513        !matches!(self.ops, TransactionOps::None)
1514    }
1515}
1516
1517/// A transaction's status code.
1518#[derive(Debug, Clone, Copy)]
1519pub enum TransactionCode {
1520    /// Not currently in a transaction
1521    Idle,
1522    /// Currently in a transaction
1523    InTransaction,
1524    /// Currently in a transaction block which is failed
1525    Failed,
1526}
1527
1528impl From<TransactionCode> for u8 {
1529    fn from(code: TransactionCode) -> Self {
1530        match code {
1531            TransactionCode::Idle => b'I',
1532            TransactionCode::InTransaction => b'T',
1533            TransactionCode::Failed => b'E',
1534        }
1535    }
1536}
1537
1538impl From<TransactionCode> for String {
1539    fn from(code: TransactionCode) -> Self {
1540        char::from(u8::from(code)).to_string()
1541    }
1542}
1543
1544impl<T> From<&TransactionStatus<T>> for TransactionCode {
1545    /// Convert from the Session's version
1546    fn from(status: &TransactionStatus<T>) -> TransactionCode {
1547        match status {
1548            TransactionStatus::Default => TransactionCode::Idle,
1549            TransactionStatus::Started(_) => TransactionCode::InTransaction,
1550            TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1551            TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1552            TransactionStatus::Failed(_) => TransactionCode::Failed,
1553        }
1554    }
1555}
1556
1557/// The type of operation being performed by the transaction.
1558///
1559/// This is needed because we currently do not allow mixing reads and writes in
1560/// a transaction. Use this to record what we have done, and what may need to
1561/// happen at commit.
1562#[derive(Derivative)]
1563#[derivative(Debug)]
1564pub enum TransactionOps<T> {
1565    /// The transaction has been initiated, but no statement has yet been executed
1566    /// in it.
1567    None,
1568    /// This transaction has had a peek (`SELECT`, `SUBSCRIBE`). If the inner value
1569    /// is has a timestamp, it must only do other peeks. However, if it doesn't
1570    /// have a timestamp (i.e. the values are constants), the transaction can still
1571    /// perform writes.
1572    Peeks {
1573        /// The timestamp and timestamp related metadata for the peek.
1574        determination: TimestampDetermination<T>,
1575        /// The cluster used to execute peeks.
1576        cluster_id: ClusterId,
1577        /// Whether this peek needs to be linearized.
1578        requires_linearization: RequireLinearization,
1579    },
1580    /// This transaction has done a `SUBSCRIBE` and must do nothing else.
1581    Subscribe,
1582    /// This transaction has had a write (`INSERT`, `UPDATE`, `DELETE`) and must
1583    /// only do other writes, or reads whose timestamp is None (i.e. constants).
1584    Writes(Vec<WriteOp>),
1585    /// This transaction has a prospective statement that will execute during commit.
1586    SingleStatement {
1587        /// The prospective statement.
1588        stmt: Arc<Statement<Raw>>,
1589        /// The statement params.
1590        params: mz_sql::plan::Params,
1591    },
1592    /// This transaction has run some _simple_ DDL and must do nothing else. Any statement/plan that
1593    /// uses this must return false in `must_serialize_ddl()` because this is serialized instead in
1594    /// `sequence_plan()` during `COMMIT`.
1595    DDL {
1596        /// Catalog operations that have already run, and must run before each subsequent op.
1597        ops: Vec<crate::catalog::Op>,
1598        /// In-memory state that reflects the previously applied ops.
1599        state: CatalogState,
1600        /// A list of side effects that should be executed if this DDL transaction commits.
1601        #[derivative(Debug = "ignore")]
1602        side_effects: Vec<
1603            Box<
1604                dyn for<'a> FnOnce(
1605                        &'a mut Coordinator,
1606                        Option<&'a mut ExecuteContext>,
1607                    ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1608                    + Send
1609                    + Sync,
1610            >,
1611        >,
1612        /// Transient revision of the `Catalog` when this transaction started.
1613        revision: u64,
1614        /// Snapshot of the durable transaction state after the last dry run.
1615        /// Used to initialize the next dry run's transaction so it starts
1616        /// in sync with the accumulated `state`. `None` for the first
1617        /// statement in the transaction (before any dry run).
1618        snapshot: Option<Snapshot>,
1619    },
1620}
1621
1622impl<T> TransactionOps<T> {
1623    fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1624        match self {
1625            TransactionOps::Peeks { determination, .. } => Some(determination),
1626            TransactionOps::None
1627            | TransactionOps::Subscribe
1628            | TransactionOps::Writes(_)
1629            | TransactionOps::SingleStatement { .. }
1630            | TransactionOps::DDL { .. } => None,
1631        }
1632    }
1633}
1634
1635impl<T> Default for TransactionOps<T> {
1636    fn default() -> Self {
1637        Self::None
1638    }
1639}
1640
1641/// An `INSERT` waiting to be committed.
1642#[derive(Debug, Clone, PartialEq)]
1643pub struct WriteOp {
1644    /// The target table.
1645    pub id: CatalogItemId,
1646    /// The data rows.
1647    pub rows: TableData,
1648}
1649
1650/// Whether a transaction requires linearization.
1651#[derive(Debug)]
1652pub enum RequireLinearization {
1653    /// Linearization is required.
1654    Required,
1655    /// Linearization is not required.
1656    NotRequired,
1657}
1658
1659impl From<&ExplainContext> for RequireLinearization {
1660    fn from(ctx: &ExplainContext) -> Self {
1661        match ctx {
1662            ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1663                RequireLinearization::Required
1664            }
1665            _ => RequireLinearization::NotRequired,
1666        }
1667    }
1668}
1669
1670/// A complete set of exclusive locks for writing to collections identified by [`CatalogItemId`]s.
1671///
1672/// To prevent deadlocks between two sessions, we do not allow acquiring a partial set of locks.
1673#[derive(Debug)]
1674pub struct WriteLocks {
1675    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1676    /// Connection that currently holds these locks, used for tracing purposes only.
1677    conn_id: ConnectionId,
1678}
1679
1680impl WriteLocks {
1681    /// Create a [`WriteLocksBuilder`] pre-defining all of the locks we need.
1682    ///
1683    /// When "finishing" the builder with [`WriteLocksBuilder::all_or_nothing`], if we haven't
1684    /// acquired all of the necessary locks we drop any partially acquired ones.
1685    pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1686        let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1687        WriteLocksBuilder { locks }
1688    }
1689
1690    /// Validate this set of [`WriteLocks`] is sufficient for the provided collections.
1691    /// Dropping the currently held locks if it's not.
1692    pub fn validate(
1693        self,
1694        collections: impl Iterator<Item = CatalogItemId>,
1695    ) -> Result<Self, BTreeSet<CatalogItemId>> {
1696        let mut missing = BTreeSet::new();
1697        for collection in collections {
1698            if !self.locks.contains_key(&collection) {
1699                missing.insert(collection);
1700            }
1701        }
1702
1703        if missing.is_empty() {
1704            Ok(self)
1705        } else {
1706            // Explicitly drop the already acquired locks.
1707            drop(self);
1708            Err(missing)
1709        }
1710    }
1711}
1712
1713impl Drop for WriteLocks {
1714    fn drop(&mut self) {
1715        // We may have merged the locks into GroupCommitWriteLocks, thus it could be empty.
1716        if !self.locks.is_empty() {
1717            tracing::info!(
1718                conn_id = %self.conn_id,
1719                locks = ?self.locks,
1720                "dropping write locks",
1721            );
1722        }
1723    }
1724}
1725
1726/// A builder struct that helps us acquire all of the locks we need, or none of them.
1727///
1728/// See [`WriteLocks::builder`].
1729#[derive(Debug)]
1730pub struct WriteLocksBuilder {
1731    locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1732}
1733
1734impl WriteLocksBuilder {
1735    /// Adds a lock to this builder.
1736    pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1737        self.locks.insert(id, Some(lock));
1738    }
1739
1740    /// Finish this builder by returning either all of the necessary locks, or none of them.
1741    ///
1742    /// If we fail to acquire all of the locks, returns one of the [`CatalogItemId`]s that we
1743    /// failed to acquire a lock for, that should be awaited so we know when to run again.
1744    pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1745        let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1746            self.locks
1747                .into_iter()
1748                .partition_map(|(gid, lock)| match lock {
1749                    Some(lock) => itertools::Either::Left((gid, lock)),
1750                    None => itertools::Either::Right(gid),
1751                });
1752
1753        match missing.iter().next() {
1754            None => {
1755                tracing::info!(%conn_id, ?locks, "acquired write locks");
1756                Ok(WriteLocks {
1757                    locks,
1758                    conn_id: conn_id.clone(),
1759                })
1760            }
1761            Some(gid) => {
1762                tracing::info!(?missing, "failed to acquire write locks");
1763                // Explicitly drop the already acquired locks.
1764                drop(locks);
1765                Err(*gid)
1766            }
1767        }
1768    }
1769}
1770
1771/// Collection of [`WriteLocks`] gathered during [`group_commit`].
1772///
1773/// Note: This struct should __never__ be used outside of group commit because it attempts to merge
1774/// together several collections of [`WriteLocks`] which if not done carefully can cause deadlocks
1775/// or consistency violations.
1776///
1777/// We must prevent writes from occurring to tables during read then write plans (e.g. `UPDATE`)
1778/// but we can allow blind writes (e.g. `INSERT`) to get committed concurrently at the same
1779/// timestamp when submitting the updates from a read then write plan.
1780///
1781/// Naively it would seem as though we could allow blind writes to occur whenever as blind writes
1782/// could never cause invalid retractions, but it could cause us to violate serializability because
1783/// there is no total order we could define for the transactions. Consider the following scenario:
1784///
1785/// ```text
1786/// table: foo
1787///
1788///  a | b
1789/// --------
1790///  x   2
1791///  y   3
1792///  z   4
1793///
1794/// -- Session(A)
1795/// -- read then write plan, reads at t0, writes at t3, transaction Ta
1796/// DELETE FROM foo WHERE b % 2 = 0;
1797///
1798///
1799/// -- Session(B)
1800/// -- blind write into foo, writes at t1, transaction Tb
1801/// INSERT INTO foo VALUES ('q', 6);
1802/// -- select from foo, reads at t2, transaction Tc
1803/// SELECT * FROM foo;
1804///
1805///
1806/// The times these operations occur at are ordered:
1807/// t0 < t1 < t2 < t3
1808///
1809/// Given the timing of the operations, the transactions must have the following order:
1810///
1811/// * Ta does not observe ('q', 6), so Ta < Tb
1812/// * Tc does observe ('q', 6), so Tb < Tc
1813/// * Tc does not observe the retractions from Ta, so Tc < Ta
1814///
1815/// For total order to exist, Ta < Tb < Tc < Ta, which is impossible.
1816/// ```
1817///
1818/// [`group_commit`]: super::coord::Coordinator::group_commit
1819#[derive(Debug, Default)]
1820pub(crate) struct GroupCommitWriteLocks {
1821    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1822}
1823
1824impl GroupCommitWriteLocks {
1825    /// Merge a set of [`WriteLocks`] into this collection for group commit.
1826    pub fn merge(&mut self, mut locks: WriteLocks) {
1827        // Note: Ideally we would use `.drain`, but that method doesn't exist for BTreeMap.
1828        //
1829        // See: <https://github.com/rust-lang/rust/issues/81074>
1830        let existing = std::mem::take(&mut locks.locks);
1831        self.locks.extend(existing);
1832    }
1833
1834    /// Returns the collections we're missing locks for, if any.
1835    pub fn missing_locks(
1836        &self,
1837        writes: impl Iterator<Item = CatalogItemId>,
1838    ) -> BTreeSet<CatalogItemId> {
1839        let mut missing = BTreeSet::new();
1840        for write in writes {
1841            if !self.locks.contains_key(&write) {
1842                missing.insert(write);
1843            }
1844        }
1845        missing
1846    }
1847}
1848
1849impl Drop for GroupCommitWriteLocks {
1850    fn drop(&mut self) {
1851        if !self.locks.is_empty() {
1852            tracing::info!(
1853                locks = ?self.locks,
1854                "dropping group commit write locks",
1855            );
1856        }
1857    }
1858}