Skip to main content

mz_adapter/
session.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Per-connection configuration parameters and state.
11
12#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::future::Future;
17use std::mem;
18use std::net::IpAddr;
19use std::pin::Pin;
20use std::sync::Arc;
21
22use chrono::{DateTime, Utc};
23use derivative::Derivative;
24use itertools::Itertools;
25use mz_adapter_types::connection::ConnectionId;
26use mz_auth::AuthenticatorKind;
27use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
28use mz_controller_types::ClusterId;
29use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_pgwire_common::Format;
32use mz_repr::role_id::RoleId;
33use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
34use mz_repr::{CatalogItemId, Datum, Row, RowIterator, SqlScalarType, Timestamp};
35use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
36use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::user::{
39    INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
40};
41use mz_sql::session::vars::IsolationLevel;
42pub use mz_sql::session::vars::{
43    DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
44    SERVER_PATCH_VERSION, SessionVars, Var,
45};
46use mz_sql_parser::ast::TransactionIsolationLevel;
47use mz_storage_client::client::TableData;
48use mz_storage_types::sources::Timeline;
49use qcell::{QCell, QCellOwner};
50use timely::progress::Timestamp as _;
51use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
52use tokio::sync::watch;
53use uuid::Uuid;
54
55use crate::catalog::CatalogState;
56use crate::client::RecordFirstRowStream;
57use crate::coord::appends::BuiltinTableAppendNotify;
58use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
59use crate::coord::peek::PeekResponseUnary;
60use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
61use crate::coord::{Coordinator, ExplainContext};
62use crate::error::AdapterError;
63use crate::metrics::{Metrics, SessionMetrics};
64use crate::statement_logging::PreparedStatementLoggingInfo;
65use crate::{AdapterNotice, ExecuteContext};
66use mz_catalog::durable::Snapshot;
67
68const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
69
70/// A session holds per-connection state.
71#[derive(Derivative)]
72#[derivative(Debug)]
73pub struct Session {
74    conn_id: ConnectionId,
75    /// A globally unique identifier for the session. Not to be confused
76    /// with `conn_id`, which may be reused.
77    uuid: Uuid,
78    prepared_statements: BTreeMap<String, PreparedStatement>,
79    portals: BTreeMap<String, Portal>,
80    transaction: TransactionStatus,
81    pcx: Option<PlanContext>,
82    metrics: SessionMetrics,
83    #[derivative(Debug = "ignore")]
84    builtin_updates: Option<BuiltinTableAppendNotify>,
85
86    /// The role metadata of the current session.
87    ///
88    /// Invariant: role_metadata must be `Some` after the user has
89    /// successfully connected to and authenticated with Materialize.
90    ///
91    /// Prefer using this value over [`SessionConfig::user`].
92    //
93    // It would be better for this not to be an Option, but the
94    // `Session` is initialized before the user has connected to
95    // Materialize and is able to look up the `RoleMetadata`. The `Session`
96    // is also used to return an error when no role exists and
97    // therefore there is no valid `RoleMetadata`.
98    role_metadata: Option<RoleMetadata>,
99    client_ip: Option<IpAddr>,
100    vars: SessionVars,
101    notices_tx: mpsc::UnboundedSender<AdapterNotice>,
102    notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
103    next_transaction_id: TransactionId,
104    secret_key: u32,
105    external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
106    // Token allowing us to access `Arc<QCell<StatementLogging>>`
107    // metadata. We want these to be reference-counted, because the same
108    // statement might be referenced from multiple portals simultaneously.
109    //
110    // However, they can't be `Rc<RefCell<StatementLogging>>`, because
111    // the `Session` is sent around to different threads.
112    //
113    // On the other hand, they don't need to be
114    // `Arc<Mutex<StatementLogging>>`, because they will always be
115    // accessed from the same thread that the `Session` is currently
116    // on. We express this by gating access with this token.
117    #[derivative(Debug = "ignore")]
118    qcell_owner: QCellOwner,
119    session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle>,
120    /// Incremented when session state that is relevant to prepared statement planning changes.
121    /// Currently, only changes to `portals` are tracked. Changes to `prepared_statements` don't
122    /// need to be tracked, because prepared statements can't depend on other prepared statements.
123    /// TODO: We might want to track changes also to session variables.
124    /// (`Catalog::transient_revision` similarly tracks changes on the catalog side.)
125    state_revision: u64,
126}
127
128impl SessionMetadata for Session {
129    fn conn_id(&self) -> &ConnectionId {
130        &self.conn_id
131    }
132
133    fn client_ip(&self) -> Option<&IpAddr> {
134        self.client_ip.as_ref()
135    }
136
137    fn pcx(&self) -> &PlanContext {
138        &self
139            .transaction()
140            .inner()
141            .expect("no active transaction")
142            .pcx
143    }
144
145    fn role_metadata(&self) -> &RoleMetadata {
146        self.role_metadata
147            .as_ref()
148            .expect("role_metadata invariant violated")
149    }
150
151    fn vars(&self) -> &SessionVars {
152        &self.vars
153    }
154}
155
156/// Data structure suitable for passing to other threads that need access to some common Session
157/// properties.
158#[derive(Debug)]
159pub struct SessionMeta {
160    conn_id: ConnectionId,
161    client_ip: Option<IpAddr>,
162    pcx: PlanContext,
163    role_metadata: RoleMetadata,
164    vars: SessionVars,
165}
166
167impl SessionMetadata for SessionMeta {
168    fn vars(&self) -> &SessionVars {
169        &self.vars
170    }
171
172    fn conn_id(&self) -> &ConnectionId {
173        &self.conn_id
174    }
175
176    fn client_ip(&self) -> Option<&IpAddr> {
177        self.client_ip.as_ref()
178    }
179
180    fn pcx(&self) -> &PlanContext {
181        &self.pcx
182    }
183
184    fn role_metadata(&self) -> &RoleMetadata {
185        &self.role_metadata
186    }
187}
188
189/// Configures a new [`Session`].
190#[derive(Debug, Clone)]
191pub struct SessionConfig {
192    /// The connection ID for the session.
193    ///
194    /// May be reused after the session terminates.
195    pub conn_id: ConnectionId,
196    /// A universally unique identifier for the session, across all processes,
197    /// region, and all time.
198    ///
199    /// Must not be reused, even after the session terminates.
200    pub uuid: Uuid,
201    /// The peer address of the client
202    pub client_ip: Option<IpAddr>,
203    /// The name of the user associated with the session.
204    pub user: String,
205    /// An optional receiver that the session will periodically check for
206    /// updates to a user's external metadata.
207    pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
208    /// Helm chart version
209    pub helm_chart_version: Option<String>,
210    /// The authenticator that authenticated this user, if any.
211    pub authenticator_kind: AuthenticatorKind,
212    /// Groups from JWT claims for OIDC group-to-role sync.
213    pub groups: Option<Vec<String>>,
214}
215
216impl Session {
217    /// Creates a new session for the specified connection ID.
218    pub(crate) fn new(
219        build_info: &'static BuildInfo,
220        config: SessionConfig,
221        metrics: SessionMetrics,
222    ) -> Session {
223        assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
224        Self::new_internal(build_info, config, metrics)
225    }
226
227    /// Returns a reference-less collection of data usable by other tasks that don't have ownership
228    /// of the Session.
229    pub fn meta(&self) -> SessionMeta {
230        SessionMeta {
231            conn_id: self.conn_id().clone(),
232            client_ip: self.client_ip().copied(),
233            pcx: self.pcx().clone(),
234            role_metadata: self.role_metadata().clone(),
235            vars: self.vars.clone(),
236        }
237
238        // TODO: soft_assert that these are the same as Session.
239    }
240
241    /// Creates new statement logging metadata for a one-off
242    /// statement.
243    // Normally, such logging information would be created as part of
244    // allocating a new prepared statement, and a refcounted handle
245    // would be copied from that prepared statement to portals during
246    // binding. However, we also support (via `Command::declare`)
247    // binding a statement directly to a portal without creating an
248    // intermediate prepared statement. Thus, for those cases, a
249    // mechanism for generating the logging metadata directly is needed.
250    pub(crate) fn mint_logging<A: AstInfo>(
251        &self,
252        raw_sql: String,
253        stmt: Option<&Statement<A>>,
254        now: EpochMillis,
255    ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
256        Arc::new(QCell::new(
257            &self.qcell_owner,
258            PreparedStatementLoggingInfo::still_to_log(
259                raw_sql,
260                stmt,
261                now,
262                "".to_string(),
263                self.uuid,
264                false,
265            ),
266        ))
267    }
268
269    pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
270        self.qcell_owner.ro(&*cell)
271    }
272
273    pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
274        self.qcell_owner.rw(&*cell)
275    }
276
277    /// Returns a unique ID for the session.
278    /// Not to be confused with `connection_id`, which can be reused.
279    pub fn uuid(&self) -> Uuid {
280        self.uuid
281    }
282
283    /// Creates a new dummy session.
284    ///
285    /// Dummy sessions are intended for use when executing queries on behalf of
286    /// the system itself, rather than on behalf of a user.
287    pub fn dummy() -> Session {
288        let registry = MetricsRegistry::new();
289        let metrics = Metrics::register_into(&registry);
290        let metrics = metrics.session_metrics();
291        let mut dummy = Self::new_internal(
292            &DUMMY_BUILD_INFO,
293            SessionConfig {
294                conn_id: DUMMY_CONNECTION_ID,
295                uuid: Uuid::new_v4(),
296                user: SYSTEM_USER.name.clone(),
297                client_ip: None,
298                external_metadata_rx: None,
299                helm_chart_version: None,
300                authenticator_kind: AuthenticatorKind::None,
301                groups: None,
302            },
303            metrics,
304        );
305        dummy.initialize_role_metadata(RoleId::User(0));
306        dummy
307    }
308
309    fn new_internal(
310        build_info: &'static BuildInfo,
311        SessionConfig {
312            conn_id,
313            uuid,
314            user,
315            client_ip,
316            mut external_metadata_rx,
317            helm_chart_version,
318            authenticator_kind,
319            groups,
320        }: SessionConfig,
321        metrics: SessionMetrics,
322    ) -> Session {
323        let (notices_tx, notices_rx) = mpsc::unbounded_channel();
324        let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
325        let user = User {
326            name: user,
327            internal_metadata: None,
328            external_metadata: external_metadata_rx
329                .as_mut()
330                .map(|rx| rx.borrow_and_update().clone()),
331            authenticator_kind: Some(authenticator_kind),
332            groups,
333        };
334        let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
335        if let Some(default_cluster) = default_cluster {
336            vars.set_cluster(default_cluster.clone());
337        }
338        Session {
339            conn_id,
340            uuid,
341            transaction: TransactionStatus::Default,
342            pcx: None,
343            metrics,
344            builtin_updates: None,
345            prepared_statements: BTreeMap::new(),
346            portals: BTreeMap::new(),
347            role_metadata: None,
348            client_ip,
349            vars,
350            notices_tx,
351            notices_rx,
352            next_transaction_id: 0,
353            secret_key: rand::random(),
354            external_metadata_rx,
355            qcell_owner: QCellOwner::new(),
356            session_oracles: BTreeMap::new(),
357            state_revision: 0,
358        }
359    }
360
361    /// Returns the secret key associated with the session.
362    pub fn secret_key(&self) -> u32 {
363        self.secret_key
364    }
365
366    fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
367        if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
368            wall_time = *mock_time;
369        }
370        PlanContext::new(wall_time)
371    }
372
373    /// Starts an explicit transaction, or changes an implicit to an explicit
374    /// transaction.
375    pub fn start_transaction(
376        &mut self,
377        wall_time: DateTime<Utc>,
378        access: Option<TransactionAccessMode>,
379        isolation_level: Option<TransactionIsolationLevel>,
380    ) -> Result<(), AdapterError> {
381        // Check that current transaction state is compatible with new `access`
382        if let Some(txn) = self.transaction.inner() {
383            // `READ WRITE` prohibited if:
384            // - Currently in `READ ONLY`
385            // - Already performed a query
386            let read_write_prohibited = match txn.ops {
387                TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
388                    txn.access == Some(TransactionAccessMode::ReadOnly)
389                }
390                TransactionOps::None
391                | TransactionOps::Writes(_)
392                | TransactionOps::SingleStatement { .. }
393                | TransactionOps::DDL { .. } => false,
394            };
395
396            if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
397                return Err(AdapterError::ReadWriteUnavailable);
398            }
399        }
400
401        match std::mem::take(&mut self.transaction) {
402            TransactionStatus::Default => {
403                let id = self.next_transaction_id;
404                self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
405                self.transaction = TransactionStatus::InTransaction(Transaction {
406                    pcx: self.new_pcx(wall_time),
407                    ops: TransactionOps::None,
408                    write_lock_guards: None,
409                    access,
410                    id,
411                });
412            }
413            TransactionStatus::Started(mut txn)
414            | TransactionStatus::InTransactionImplicit(mut txn)
415            | TransactionStatus::InTransaction(mut txn) => {
416                if access.is_some() {
417                    txn.access = access;
418                }
419                self.transaction = TransactionStatus::InTransaction(txn);
420            }
421            TransactionStatus::Failed(_) => unreachable!(),
422        };
423
424        if let Some(isolation_level) = isolation_level {
425            self.vars
426                .set_local_transaction_isolation(isolation_level.into());
427        }
428
429        Ok(())
430    }
431
432    /// Starts either a single statement or implicit transaction based on the
433    /// number of statements, but only if no transaction has been started already.
434    pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
435        if let TransactionStatus::Default = self.transaction {
436            let id = self.next_transaction_id;
437            self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
438            let txn = Transaction {
439                pcx: self.new_pcx(wall_time),
440                ops: TransactionOps::None,
441                write_lock_guards: None,
442                access: None,
443                id,
444            };
445            match stmts {
446                1 => self.transaction = TransactionStatus::Started(txn),
447                n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
448                _ => {}
449            }
450        }
451    }
452
453    /// Starts a single statement transaction, but only if no transaction has been started already.
454    pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
455        self.start_transaction_implicit(wall_time, 1);
456    }
457
458    /// Clears a transaction, setting its state to Default and destroying all
459    /// portals. Returned are:
460    /// - sinks that were started in this transaction and need to be dropped
461    /// - the cleared transaction so its operations can be handled
462    ///
463    /// The [Postgres protocol docs](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) specify:
464    /// > a named portal object lasts till the end of the current transaction
465    /// and
466    /// > An unnamed portal is destroyed at the end of the transaction
467    #[must_use]
468    pub fn clear_transaction(&mut self) -> TransactionStatus {
469        self.portals.clear();
470        self.pcx = None;
471        self.state_revision += 1;
472        mem::take(&mut self.transaction)
473    }
474
475    /// Marks the current transaction as failed.
476    pub fn fail_transaction(mut self) -> Self {
477        match self.transaction {
478            TransactionStatus::Default => unreachable!(),
479            TransactionStatus::Started(txn)
480            | TransactionStatus::InTransactionImplicit(txn)
481            | TransactionStatus::InTransaction(txn) => {
482                self.transaction = TransactionStatus::Failed(txn);
483            }
484            TransactionStatus::Failed(_) => {}
485        };
486        self
487    }
488
489    /// Returns the current transaction status.
490    pub fn transaction(&self) -> &TransactionStatus {
491        &self.transaction
492    }
493
494    /// Returns the current transaction status.
495    pub fn transaction_mut(&mut self) -> &mut TransactionStatus {
496        &mut self.transaction
497    }
498
499    /// Returns the session's transaction code.
500    pub fn transaction_code(&self) -> TransactionCode {
501        self.transaction().into()
502    }
503
504    /// Adds operations to the current transaction. An error is produced if
505    /// they cannot be merged (i.e., a timestamp-dependent read cannot be
506    /// merged to an insert).
507    pub fn add_transaction_ops(&mut self, add_ops: TransactionOps) -> Result<(), AdapterError> {
508        self.transaction.add_ops(add_ops)
509    }
510
511    /// Returns a channel on which to send notices to the session.
512    pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
513        self.notices_tx.clone()
514    }
515
516    /// Adds a notice to the session.
517    pub fn add_notice(&self, notice: AdapterNotice) {
518        self.add_notices([notice])
519    }
520
521    /// Adds multiple notices to the session.
522    pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
523        for notice in notices {
524            let _ = self.notices_tx.send(notice);
525        }
526    }
527
528    /// Awaits a possible notice.
529    ///
530    /// This method is cancel safe.
531    pub async fn recv_notice(&mut self) -> AdapterNotice {
532        // This method is cancel safe because recv is cancel safe.
533        loop {
534            let notice = self
535                .notices_rx
536                .recv()
537                .await
538                .expect("Session also holds a sender, so recv won't ever return None");
539            match self.notice_filter(notice) {
540                Some(notice) => return notice,
541                None => continue,
542            }
543        }
544    }
545
546    /// Returns a draining iterator over the notices attached to the session.
547    pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
548        let mut notices = Vec::new();
549        while let Ok(notice) = self.notices_rx.try_recv() {
550            if let Some(notice) = self.notice_filter(notice) {
551                notices.push(notice);
552            }
553        }
554        notices
555    }
556
557    /// Returns Some if the notice should be reported, otherwise None.
558    fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
559        // Filter out low threshold severity.
560        let minimum_client_severity = self.vars.client_min_messages();
561        let sev = notice.severity();
562        if !minimum_client_severity.should_output_to_client(&sev) {
563            return None;
564        }
565        // Filter out notices for other clusters.
566        if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = &notice {
567            if cluster != self.vars.cluster() {
568                return None;
569            }
570        }
571        Some(notice)
572    }
573
574    /// Sets the transaction ops to `TransactionOps::None`. Must only be used after
575    /// verifying that no transaction anomalies will occur if cleared.
576    pub fn clear_transaction_ops(&mut self) {
577        if let Some(txn) = self.transaction.inner_mut() {
578            txn.ops = TransactionOps::None;
579        }
580    }
581
582    /// If the current transaction ops belong to a read, then sets the
583    /// ops to `None`, returning the old read timestamp context if
584    /// any existed. Must only be used after verifying that no transaction
585    /// anomalies will occur if cleared.
586    pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext> {
587        if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
588            if let TransactionOps::Peeks { .. } = ops {
589                let ops = std::mem::take(ops);
590                Some(
591                    ops.timestamp_determination()
592                        .expect("checked above")
593                        .timestamp_context,
594                )
595            } else {
596                None
597            }
598        } else {
599            None
600        }
601    }
602
603    /// Returns the transaction's read timestamp determination, if set.
604    ///
605    /// Returns `None` if there is no active transaction, or if the active
606    /// transaction is not a read transaction.
607    pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination> {
608        match self.transaction.inner() {
609            Some(Transaction {
610                pcx: _,
611                ops: TransactionOps::Peeks { determination, .. },
612                write_lock_guards: _,
613                access: _,
614                id: _,
615            }) => Some(determination.clone()),
616            _ => None,
617        }
618    }
619
620    /// Whether this session has a timestamp for a read transaction.
621    pub fn contains_read_timestamp(&self) -> bool {
622        matches!(
623            self.transaction.inner(),
624            Some(Transaction {
625                pcx: _,
626                ops: TransactionOps::Peeks {
627                    determination: TimestampDetermination {
628                        timestamp_context: TimestampContext::TimelineTimestamp { .. },
629                        ..
630                    },
631                    ..
632                },
633                write_lock_guards: _,
634                access: _,
635                id: _,
636            })
637        )
638    }
639
640    /// Registers the prepared statement under `name`.
641    pub fn set_prepared_statement(
642        &mut self,
643        name: String,
644        stmt: Option<Statement<Raw>>,
645        raw_sql: String,
646        desc: StatementDesc,
647        state_revision: StateRevision,
648        now: EpochMillis,
649    ) {
650        let logging = PreparedStatementLoggingInfo::still_to_log(
651            raw_sql,
652            stmt.as_ref(),
653            now,
654            name.clone(),
655            self.uuid,
656            false,
657        );
658        let statement = PreparedStatement {
659            stmt,
660            desc,
661            state_revision,
662            logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
663        };
664        self.prepared_statements.insert(name, statement);
665    }
666
667    /// Removes the prepared statement associated with `name`.
668    ///
669    /// Returns whether a statement previously existed.
670    pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
671        self.prepared_statements.remove(name).is_some()
672    }
673
674    /// Removes all prepared statements.
675    pub fn remove_all_prepared_statements(&mut self) {
676        self.prepared_statements.clear();
677    }
678
679    /// Retrieves the prepared statement associated with `name`.
680    ///
681    /// This is unverified and could be incorrect if the underlying catalog has
682    /// changed.
683    pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
684        self.prepared_statements.get(name)
685    }
686
687    /// Retrieves the prepared statement associated with `name`.
688    ///
689    /// This is unverified and could be incorrect if the underlying catalog has
690    /// changed.
691    pub fn get_prepared_statement_mut_unverified(
692        &mut self,
693        name: &str,
694    ) -> Option<&mut PreparedStatement> {
695        self.prepared_statements.get_mut(name)
696    }
697
698    /// Returns the prepared statements for the session.
699    pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
700        &self.prepared_statements
701    }
702
703    /// Returns the portals for the session.
704    pub fn portals(&self) -> &BTreeMap<String, Portal> {
705        &self.portals
706    }
707
708    /// Binds the specified portal to the specified prepared statement.
709    ///
710    /// If the prepared statement contains parameters, the values and types of
711    /// those parameters must be provided in `params`. It is the caller's
712    /// responsibility to ensure that the correct number of parameters is
713    /// provided.
714    ///
715    /// The `results_formats` parameter sets the desired format of the results,
716    /// and is stored on the portal.
717    pub fn set_portal(
718        &mut self,
719        portal_name: String,
720        desc: StatementDesc,
721        stmt: Option<Statement<Raw>>,
722        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
723        params: Vec<(Datum, SqlScalarType)>,
724        result_formats: Vec<Format>,
725        state_revision: StateRevision,
726    ) -> Result<(), AdapterError> {
727        // The empty portal can be silently replaced.
728        if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
729            return Err(AdapterError::DuplicateCursor(portal_name));
730        }
731        self.state_revision += 1;
732        let param_types = desc.param_types.clone();
733        self.portals.insert(
734            portal_name,
735            Portal {
736                stmt: stmt.map(Arc::new),
737                desc,
738                state_revision,
739                parameters: Params {
740                    datums: Row::pack(params.iter().map(|(d, _t)| d)),
741                    execute_types: params.into_iter().map(|(_d, t)| t).collect(),
742                    expected_types: param_types,
743                },
744                result_formats,
745                state: PortalState::NotStarted,
746                logging,
747                lifecycle_timestamps: None,
748            },
749        );
750        Ok(())
751    }
752
753    /// Removes the specified portal.
754    ///
755    /// If there is no such portal, this method does nothing. Returns whether that portal existed.
756    pub fn remove_portal(&mut self, portal_name: &str) -> bool {
757        self.state_revision += 1;
758        self.portals.remove(portal_name).is_some()
759    }
760
761    /// Retrieves a reference to the specified portal.
762    ///
763    /// If there is no such portal, returns `None`.
764    pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
765        self.portals.get(portal_name)
766    }
767
768    /// Retrieves a mutable reference to the specified portal.
769    ///
770    /// If there is no such portal, returns `None`.
771    ///
772    /// Note: When using the returned `PortalRefMut`, there is no need to increment
773    /// `Session::state_revision`, because the portal's meaning is not changed.
774    pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
775        self.portals.get_mut(portal_name).map(|p| PortalRefMut {
776            stmt: &p.stmt,
777            desc: &p.desc,
778            state_revision: &mut p.state_revision,
779            parameters: &mut p.parameters,
780            result_formats: &mut p.result_formats,
781            logging: &mut p.logging,
782            state: &mut p.state,
783            lifecycle_timestamps: &mut p.lifecycle_timestamps,
784        })
785    }
786
787    /// Creates and installs a new portal.
788    pub fn create_new_portal(
789        &mut self,
790        stmt: Option<Statement<Raw>>,
791        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
792        desc: StatementDesc,
793        parameters: Params,
794        result_formats: Vec<Format>,
795        state_revision: StateRevision,
796    ) -> Result<String, AdapterError> {
797        self.state_revision += 1;
798
799        // See: https://github.com/postgres/postgres/blob/84f5c2908dad81e8622b0406beea580e40bb03ac/src/backend/utils/mmgr/portalmem.c#L234
800        for i in 0usize.. {
801            let name = format!("<unnamed portal {}>", i);
802            match self.portals.entry(name.clone()) {
803                Entry::Occupied(_) => continue,
804                Entry::Vacant(entry) => {
805                    entry.insert(Portal {
806                        stmt: stmt.map(Arc::new),
807                        desc,
808                        state_revision,
809                        parameters,
810                        result_formats,
811                        state: PortalState::NotStarted,
812                        logging,
813                        lifecycle_timestamps: None,
814                    });
815                    return Ok(name);
816                }
817            }
818        }
819
820        coord_bail!("unable to create a new portal");
821    }
822
823    /// Resets the session to its initial state. Returns sinks that need to be
824    /// dropped.
825    pub fn reset(&mut self) {
826        let _ = self.clear_transaction();
827        self.prepared_statements.clear();
828        self.vars.reset_all();
829    }
830
831    /// Returns the [application_name] that created this session.
832    ///
833    /// [application_name]: (https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-APPLICATION-NAME)
834    pub fn application_name(&self) -> &str {
835        self.vars.application_name()
836    }
837
838    /// Returns a reference to the variables in this session.
839    pub fn vars(&self) -> &SessionVars {
840        &self.vars
841    }
842
843    /// Returns a mutable reference to the variables in this session.
844    pub fn vars_mut(&mut self) -> &mut SessionVars {
845        &mut self.vars
846    }
847
848    /// Grants a set of write locks to this session's inner [`Transaction`].
849    ///
850    /// # Panics
851    /// If the inner transaction is idle. See [`TransactionStatus::try_grant_write_locks`].
852    ///
853    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
854        self.transaction.try_grant_write_locks(guards)
855    }
856
857    /// Drains any external metadata updates and applies the changes from the latest update.
858    pub fn apply_external_metadata_updates(&mut self) {
859        // If no sender is registered then there isn't anything to do.
860        let Some(rx) = &mut self.external_metadata_rx else {
861            return;
862        };
863
864        // If the value hasn't changed then return.
865        if !rx.has_changed().unwrap_or(false) {
866            return;
867        }
868
869        // Update our metadata! Note the short critical section (just a clone) to avoid blocking
870        // the sending side of this watch channel.
871        let metadata = rx.borrow_and_update().clone();
872        self.vars.set_external_user_metadata(metadata);
873    }
874
875    /// Applies the internal user metadata to the session.
876    pub fn apply_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
877        self.vars.set_internal_user_metadata(metadata);
878    }
879
880    /// Initializes the session's role metadata.
881    pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
882        self.role_metadata = Some(RoleMetadata::new(role_id));
883    }
884
885    /// Ensures that a timestamp oracle exists for `timeline` and returns a mutable reference to
886    /// the timestamp oracle.
887    pub fn ensure_timestamp_oracle(&mut self, timeline: Timeline) -> &mut InMemoryTimestampOracle {
888        self.session_oracles.entry(timeline).or_insert_with(|| {
889            InMemoryTimestampOracle::new(Timestamp::minimum(), NowFn::from(Timestamp::minimum))
890        })
891    }
892
893    /// Ensures that a timestamp oracle exists for reads and writes from/to a local input and
894    /// returns a mutable reference to the timestamp oracle.
895    pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle {
896        self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
897    }
898
899    /// Returns a reference to the timestamp oracle for `timeline`.
900    pub fn get_timestamp_oracle(&self, timeline: &Timeline) -> Option<&InMemoryTimestampOracle> {
901        self.session_oracles.get(timeline)
902    }
903
904    /// If the current session is using the Strong Session Serializable isolation level advance the
905    /// session local timestamp oracle to `write_ts`.
906    pub fn apply_write(&mut self, timestamp: Timestamp) {
907        if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
908            self.ensure_local_timestamp_oracle().apply_write(timestamp);
909        }
910    }
911
912    /// Returns the [`SessionMetrics`] instance associated with this [`Session`].
913    pub fn metrics(&self) -> &SessionMetrics {
914        &self.metrics
915    }
916
917    /// Sets the `BuiltinTableAppendNotify` for this session.
918    pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
919        let prev = self.builtin_updates.replace(fut);
920        mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
921    }
922
923    /// Takes the stashed `BuiltinTableAppendNotify`, if one exists, and returns a [`Future`] that
924    /// waits for the writes to complete.
925    pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
926        if let Some(fut) = self.builtin_updates.take() {
927            // Record how long we blocked for, if we blocked at all.
928            let histogram = self
929                .metrics()
930                .session_startup_table_writes_seconds()
931                .clone();
932            Some(async move {
933                fut.wall_time().observe(histogram).await;
934            })
935        } else {
936            None
937        }
938    }
939
940    /// Return the state_revision of the session, which can be used by dependent objects for knowing
941    /// when to re-plan due to session state changes.
942    pub fn state_revision(&self) -> u64 {
943        self.state_revision
944    }
945}
946
947/// A prepared statement.
948#[derive(Derivative, Clone)]
949#[derivative(Debug)]
950pub struct PreparedStatement {
951    stmt: Option<Statement<Raw>>,
952    desc: StatementDesc,
953    /// The most recent state revision that has verified this statement.
954    pub state_revision: StateRevision,
955    #[derivative(Debug = "ignore")]
956    logging: Arc<QCell<PreparedStatementLoggingInfo>>,
957}
958
959impl PreparedStatement {
960    /// Returns the AST associated with this prepared statement,
961    /// if the prepared statement was not the empty query.
962    pub fn stmt(&self) -> Option<&Statement<Raw>> {
963        self.stmt.as_ref()
964    }
965
966    /// Returns the description of the prepared statement.
967    pub fn desc(&self) -> &StatementDesc {
968        &self.desc
969    }
970
971    /// Returns a handle to the metadata for statement logging.
972    pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
973        &self.logging
974    }
975}
976
977/// A portal represents the execution state of a running or runnable query.
978#[derive(Derivative)]
979#[derivative(Debug)]
980pub struct Portal {
981    /// The statement that is bound to this portal.
982    pub stmt: Option<Arc<Statement<Raw>>>,
983    /// The statement description.
984    pub desc: StatementDesc,
985    /// The most recent state revision that has verified this portal.
986    pub state_revision: StateRevision,
987    /// The bound values for the parameters in the prepared statement, if any.
988    pub parameters: Params,
989    /// The desired output format for each column in the result set.
990    pub result_formats: Vec<Format>,
991    /// A handle to metadata needed for statement logging.
992    #[derivative(Debug = "ignore")]
993    pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
994    /// The execution state of the portal.
995    #[derivative(Debug = "ignore")]
996    pub state: PortalState,
997    /// Statement lifecycle timestamps coming from `mz-pgwire`.
998    pub lifecycle_timestamps: Option<LifecycleTimestamps>,
999}
1000
1001/// A mutable reference to a portal, capturing its state and associated metadata. Importantly, it
1002/// does _not_ give _mutable_ access to `stmt` and `desc`, which means that you do not need to
1003/// increment `Session::state_revision` when modifying fields through a `PortalRefMut`, because the
1004/// portal's meaning is not changed.
1005pub struct PortalRefMut<'a> {
1006    /// The statement that is bound to this portal.
1007    pub stmt: &'a Option<Arc<Statement<Raw>>>,
1008    /// The statement description.
1009    pub desc: &'a StatementDesc,
1010    /// The most recent state revision that has verified this portal.
1011    pub state_revision: &'a mut StateRevision,
1012    /// The bound values for the parameters in the prepared statement, if any.
1013    pub parameters: &'a mut Params,
1014    /// The desired output format for each column in the result set.
1015    pub result_formats: &'a mut Vec<Format>,
1016    /// A handle to metadata needed for statement logging.
1017    pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1018    /// The execution state of the portal.
1019    pub state: &'a mut PortalState,
1020    /// Statement lifecycle timestamps coming from `mz-pgwire`.
1021    pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1022}
1023
1024/// Points to a revision of catalog state and session state. When the current revisions are not the
1025/// same as the revisions when a prepared statement or a portal was described, we need to check
1026/// whether the description is still valid.
1027#[derive(Debug, Clone, Copy, PartialEq)]
1028pub struct StateRevision {
1029    /// A revision of the catalog.
1030    pub catalog_revision: u64,
1031    /// A revision of the session state.
1032    pub session_state_revision: u64,
1033}
1034
1035/// Execution states of a portal.
1036pub enum PortalState {
1037    /// Portal not yet started.
1038    NotStarted,
1039    /// Portal is a rows-returning statement in progress with 0 or more rows
1040    /// remaining.
1041    InProgress(Option<InProgressRows>),
1042    /// Portal has completed and should not be re-executed. If the optional string
1043    /// is present, it is returned as a CommandComplete tag, otherwise an error
1044    /// is sent.
1045    Completed(Option<String>),
1046}
1047
1048/// State of an in-progress, rows-returning portal.
1049pub struct InProgressRows {
1050    /// The current batch of rows.
1051    pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1052    /// A stream from which to fetch more row batches.
1053    pub remaining: RecordFirstRowStream,
1054}
1055
1056impl InProgressRows {
1057    /// Creates a new InProgressRows from a batch stream.
1058    pub fn new(remaining: RecordFirstRowStream) -> Self {
1059        Self {
1060            current: None,
1061            remaining,
1062        }
1063    }
1064
1065    /// Determines whether the underlying stream has ended and there are also no more rows
1066    /// stashed in `current`.
1067    pub fn no_more_rows(&self) -> bool {
1068        self.remaining.no_more_rows && self.current.is_none()
1069    }
1070}
1071
1072/// A channel of batched rows.
1073pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1074
1075/// Part of statement lifecycle. These are timestamps that come from the Adapter frontend
1076/// (`mz-pgwire`) part of the lifecycle.
1077#[derive(Debug, Clone)]
1078pub struct LifecycleTimestamps {
1079    /// When the query was received. More specifically, when the tokio recv returned.
1080    /// For a Simple Query, this is for the whole query, for the Extended Query flow, this is only
1081    /// for `FrontendMessage::Execute`. (This means that this is after parsing for the
1082    /// Extended Query flow.)
1083    pub received: EpochMillis,
1084}
1085
1086impl LifecycleTimestamps {
1087    /// Creates a new `LifecycleTimestamps`.
1088    pub fn new(received: EpochMillis) -> Self {
1089        Self { received }
1090    }
1091}
1092
1093/// The transaction status of a session.
1094///
1095/// PostgreSQL's transaction states are in backend/access/transam/xact.c.
1096#[derive(Debug)]
1097pub enum TransactionStatus {
1098    /// Idle. Matches `TBLOCK_DEFAULT`.
1099    Default,
1100    /// Running a single-query transaction. Matches
1101    /// `TBLOCK_STARTED`. In PostgreSQL, when using the extended query protocol, this
1102    /// may be upgraded into multi-statement implicit query (see [`Self::InTransactionImplicit`]).
1103    /// Additionally, some statements may trigger an eager commit of the implicit transaction,
1104    /// see: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>. In
1105    /// Materialize however, we eagerly commit all statements outside of an explicit transaction
1106    /// when using the extended query protocol. Therefore, we can guarantee that this state will
1107    /// always be a single-query transaction and never be upgraded into a multi-statement implicit
1108    /// query.
1109    Started(Transaction),
1110    /// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
1111    InTransaction(Transaction),
1112    /// Currently in an implicit transaction started from a multi-statement query
1113    /// with more than 1 statements. Matches `TBLOCK_IMPLICIT_INPROGRESS`.
1114    InTransactionImplicit(Transaction),
1115    /// In a failed transaction. Matches `TBLOCK_ABORT`.
1116    Failed(Transaction),
1117}
1118
1119impl TransactionStatus {
1120    /// Extracts the inner transaction ops and write lock guard if not failed.
1121    pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps>, Option<WriteLocks>) {
1122        match self {
1123            TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1124            TransactionStatus::Started(txn)
1125            | TransactionStatus::InTransaction(txn)
1126            | TransactionStatus::InTransactionImplicit(txn) => {
1127                (Some(txn.ops), txn.write_lock_guards)
1128            }
1129        }
1130    }
1131
1132    /// Exposes the inner transaction.
1133    pub fn inner(&self) -> Option<&Transaction> {
1134        match self {
1135            TransactionStatus::Default => None,
1136            TransactionStatus::Started(txn)
1137            | TransactionStatus::InTransaction(txn)
1138            | TransactionStatus::InTransactionImplicit(txn)
1139            | TransactionStatus::Failed(txn) => Some(txn),
1140        }
1141    }
1142
1143    /// Exposes the inner transaction.
1144    pub fn inner_mut(&mut self) -> Option<&mut Transaction> {
1145        match self {
1146            TransactionStatus::Default => None,
1147            TransactionStatus::Started(txn)
1148            | TransactionStatus::InTransaction(txn)
1149            | TransactionStatus::InTransactionImplicit(txn)
1150            | TransactionStatus::Failed(txn) => Some(txn),
1151        }
1152    }
1153
1154    /// Whether the transaction's ops are DDL.
1155    pub fn is_ddl(&self) -> bool {
1156        match self {
1157            TransactionStatus::Default => false,
1158            TransactionStatus::Started(txn)
1159            | TransactionStatus::InTransaction(txn)
1160            | TransactionStatus::InTransactionImplicit(txn)
1161            | TransactionStatus::Failed(txn) => {
1162                matches!(txn.ops, TransactionOps::DDL { .. })
1163            }
1164        }
1165    }
1166
1167    /// Expresses whether or not the transaction was implicitly started.
1168    /// However, its negation does not imply explicitly started.
1169    pub fn is_implicit(&self) -> bool {
1170        match self {
1171            TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1172            TransactionStatus::Default
1173            | TransactionStatus::InTransaction(_)
1174            | TransactionStatus::Failed(_) => false,
1175        }
1176    }
1177
1178    /// Whether the transaction may contain multiple statements.
1179    pub fn is_in_multi_statement_transaction(&self) -> bool {
1180        match self {
1181            TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1182                true
1183            }
1184            TransactionStatus::Default
1185            | TransactionStatus::Started(_)
1186            | TransactionStatus::Failed(_) => false,
1187        }
1188    }
1189
1190    /// Whether we are in a multi-statement transaction, AND the query is immediate.
1191    pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1192        self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1193    }
1194
1195    /// Grants the writes lock to the inner transaction, returning an error if the transaction
1196    /// has already been granted write locks.
1197    ///
1198    /// # Panics
1199    /// If `self` is `TransactionStatus::Default`, which indicates that the
1200    /// transaction is idle, which is not appropriate to assign the
1201    /// coordinator's write lock to.
1202    ///
1203    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1204        match self {
1205            TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1206            TransactionStatus::Started(txn)
1207            | TransactionStatus::InTransaction(txn)
1208            | TransactionStatus::InTransactionImplicit(txn)
1209            | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1210        }
1211    }
1212
1213    /// Returns the currently held [`WriteLocks`], if this transaction holds any.
1214    pub fn write_locks(&self) -> Option<&WriteLocks> {
1215        match self {
1216            TransactionStatus::Default => None,
1217            TransactionStatus::Started(txn)
1218            | TransactionStatus::InTransaction(txn)
1219            | TransactionStatus::InTransactionImplicit(txn)
1220            | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1221        }
1222    }
1223
1224    /// The timeline of the transaction, if one exists.
1225    pub fn timeline(&self) -> Option<Timeline> {
1226        match self {
1227            TransactionStatus::Default => None,
1228            TransactionStatus::Started(txn)
1229            | TransactionStatus::InTransaction(txn)
1230            | TransactionStatus::InTransactionImplicit(txn)
1231            | TransactionStatus::Failed(txn) => txn.timeline(),
1232        }
1233    }
1234
1235    /// The cluster of the transaction, if one exists.
1236    pub fn cluster(&self) -> Option<ClusterId> {
1237        match self {
1238            TransactionStatus::Default => None,
1239            TransactionStatus::Started(txn)
1240            | TransactionStatus::InTransaction(txn)
1241            | TransactionStatus::InTransactionImplicit(txn)
1242            | TransactionStatus::Failed(txn) => txn.cluster(),
1243        }
1244    }
1245
1246    /// Snapshot of the catalog that reflects DDL operations run in this transaction.
1247    pub fn catalog_state(&self) -> Option<&CatalogState> {
1248        match self.inner() {
1249            Some(Transaction {
1250                ops: TransactionOps::DDL { state, .. },
1251                ..
1252            }) => Some(state),
1253            _ => None,
1254        }
1255    }
1256
1257    /// Reports whether any operations have been executed as part of this transaction
1258    pub fn contains_ops(&self) -> bool {
1259        match self.inner() {
1260            Some(txn) => txn.contains_ops(),
1261            None => false,
1262        }
1263    }
1264
1265    /// Checks whether the current state of this transaction allows writes
1266    /// (adding write ops).
1267    /// transaction
1268    pub fn allows_writes(&self) -> bool {
1269        match self {
1270            TransactionStatus::Started(Transaction { ops, access, .. })
1271            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1272            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1273                match ops {
1274                    TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1275                    TransactionOps::Peeks { determination, .. } => {
1276                        // If-and-only-if peeks thus far do not have a timestamp
1277                        // (i.e. they are constant), we can switch to a write
1278                        // transaction.
1279                        !determination.timestamp_context.contains_timestamp()
1280                    }
1281                    TransactionOps::Subscribe => false,
1282                    TransactionOps::Writes(_) => true,
1283                    TransactionOps::SingleStatement { .. } => false,
1284                    TransactionOps::DDL { .. } => false,
1285                }
1286            }
1287            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1288                unreachable!()
1289            }
1290        }
1291    }
1292
1293    /// Adds operations to the current transaction. An error is produced if they cannot be merged
1294    /// (i.e., a timestamp-dependent read cannot be merged to an insert).
1295    ///
1296    /// The `DDL` variant is an exception and does not merge operations, but instead overwrites the
1297    /// old ops with the new ops. This is correct because it is only used in conjunction with the
1298    /// Dry Run catalog op which returns an error containing all of the ops, and those ops are
1299    /// passed to this function which then overwrites.
1300    ///
1301    /// # Panics
1302    /// If the operations are compatible but the operation metadata doesn't match. Such as reads at
1303    /// different timestamps, reads on different timelines, reads on different clusters, etc. It's
1304    /// up to the caller to make sure these are aligned.
1305    pub fn add_ops(&mut self, add_ops: TransactionOps) -> Result<(), AdapterError> {
1306        match self {
1307            TransactionStatus::Started(Transaction { ops, access, .. })
1308            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1309            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1310                match ops {
1311                    TransactionOps::None => {
1312                        if matches!(access, Some(TransactionAccessMode::ReadOnly))
1313                            && matches!(add_ops, TransactionOps::Writes(_))
1314                        {
1315                            return Err(AdapterError::ReadOnlyTransaction);
1316                        }
1317                        *ops = add_ops;
1318                    }
1319                    TransactionOps::Peeks {
1320                        determination,
1321                        cluster_id,
1322                        requires_linearization,
1323                    } => match add_ops {
1324                        TransactionOps::Peeks {
1325                            determination: add_timestamp_determination,
1326                            cluster_id: add_cluster_id,
1327                            requires_linearization: add_requires_linearization,
1328                        } => {
1329                            assert_eq!(*cluster_id, add_cluster_id);
1330                            match (
1331                                &determination.timestamp_context,
1332                                &add_timestamp_determination.timestamp_context,
1333                            ) {
1334                                (
1335                                    TimestampContext::TimelineTimestamp {
1336                                        timeline: txn_timeline,
1337                                        chosen_ts: txn_ts,
1338                                        oracle_ts: _,
1339                                    },
1340                                    TimestampContext::TimelineTimestamp {
1341                                        timeline: add_timeline,
1342                                        chosen_ts: add_ts,
1343                                        oracle_ts: _,
1344                                    },
1345                                ) => {
1346                                    assert_eq!(txn_timeline, add_timeline);
1347                                    assert_eq!(txn_ts, add_ts);
1348                                }
1349                                (TimestampContext::NoTimestamp, _) => {
1350                                    *determination = add_timestamp_determination
1351                                }
1352                                (_, TimestampContext::NoTimestamp) => {}
1353                            };
1354                            if matches!(requires_linearization, RequireLinearization::NotRequired)
1355                                && matches!(
1356                                    add_requires_linearization,
1357                                    RequireLinearization::Required
1358                                )
1359                            {
1360                                *requires_linearization = add_requires_linearization;
1361                            }
1362                        }
1363                        // Iff peeks thus far do not have a timestamp (i.e.
1364                        // they are constant), we can switch to a write
1365                        // transaction.
1366                        writes @ TransactionOps::Writes(..)
1367                            if !determination.timestamp_context.contains_timestamp() =>
1368                        {
1369                            *ops = writes;
1370                        }
1371                        _ => return Err(AdapterError::ReadOnlyTransaction),
1372                    },
1373                    TransactionOps::Subscribe => {
1374                        return Err(AdapterError::SubscribeOnlyTransaction);
1375                    }
1376                    TransactionOps::Writes(txn_writes) => match add_ops {
1377                        TransactionOps::Writes(mut add_writes) => {
1378                            // We should have already checked the access above, but make sure we don't miss
1379                            // it anyway.
1380                            assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1381                            txn_writes.append(&mut add_writes);
1382                        }
1383                        // Iff peeks do not have a timestamp (i.e. they are
1384                        // constant), we can permit them.
1385                        TransactionOps::Peeks { determination, .. }
1386                            if !determination.timestamp_context.contains_timestamp() => {}
1387                        _ => {
1388                            return Err(AdapterError::WriteOnlyTransaction);
1389                        }
1390                    },
1391                    TransactionOps::SingleStatement { .. } => {
1392                        return Err(AdapterError::SingleStatementTransaction);
1393                    }
1394                    TransactionOps::DDL {
1395                        ops: og_ops,
1396                        revision: og_revision,
1397                        state: og_state,
1398                        side_effects,
1399                        snapshot: og_snapshot,
1400                    } => match add_ops {
1401                        TransactionOps::DDL {
1402                            ops: new_ops,
1403                            revision: new_revision,
1404                            side_effects: mut net_new_side_effects,
1405                            state: new_state,
1406                            snapshot: new_snapshot,
1407                        } => {
1408                            if *og_revision != new_revision {
1409                                return Err(AdapterError::DDLTransactionRace);
1410                            }
1411                            // The old og_ops are overwritten, not extended.
1412                            if !new_ops.is_empty() {
1413                                *og_ops = new_ops;
1414                                *og_state = new_state;
1415                                *og_snapshot = new_snapshot;
1416                            }
1417                            side_effects.append(&mut net_new_side_effects);
1418                        }
1419                        _ => return Err(AdapterError::DDLOnlyTransaction),
1420                    },
1421                }
1422            }
1423            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1424                unreachable!()
1425            }
1426        }
1427        Ok(())
1428    }
1429}
1430
1431/// An abstraction allowing us to identify different transactions.
1432pub type TransactionId = u64;
1433
1434impl Default for TransactionStatus {
1435    fn default() -> Self {
1436        TransactionStatus::Default
1437    }
1438}
1439
1440/// State data for transactions.
1441#[derive(Debug)]
1442pub struct Transaction {
1443    /// Plan context.
1444    pub pcx: PlanContext,
1445    /// Transaction operations.
1446    pub ops: TransactionOps,
1447    /// Uniquely identifies the transaction on a per connection basis.
1448    /// Two transactions started from separate connections may share the
1449    /// same ID.
1450    /// If all IDs have been exhausted, this will wrap around back to 0.
1451    pub id: TransactionId,
1452    /// Locks for objects this transaction will operate on.
1453    write_lock_guards: Option<WriteLocks>,
1454    /// Access mode (read only, read write).
1455    access: Option<TransactionAccessMode>,
1456}
1457
1458impl Transaction {
1459    /// Tries to grant the write lock to this transaction for the remainder of its lifetime. Errors
1460    /// if this [`Transaction`] has already been granted write locks.
1461    fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1462        match &mut self.write_lock_guards {
1463            Some(existing) => Err(existing),
1464            locks @ None => {
1465                *locks = Some(guards);
1466                Ok(())
1467            }
1468        }
1469    }
1470
1471    /// The timeline of the transaction, if one exists.
1472    fn timeline(&self) -> Option<Timeline> {
1473        match &self.ops {
1474            TransactionOps::Peeks {
1475                determination:
1476                    TimestampDetermination {
1477                        timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1478                        ..
1479                    },
1480                ..
1481            } => Some(timeline.clone()),
1482            TransactionOps::Peeks { .. }
1483            | TransactionOps::None
1484            | TransactionOps::Subscribe
1485            | TransactionOps::Writes(_)
1486            | TransactionOps::SingleStatement { .. }
1487            | TransactionOps::DDL { .. } => None,
1488        }
1489    }
1490
1491    /// The cluster of the transaction, if one exists.
1492    pub fn cluster(&self) -> Option<ClusterId> {
1493        match &self.ops {
1494            TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1495            TransactionOps::None
1496            | TransactionOps::Subscribe
1497            | TransactionOps::Writes(_)
1498            | TransactionOps::SingleStatement { .. }
1499            | TransactionOps::DDL { .. } => None,
1500        }
1501    }
1502
1503    /// Reports whether any operations have been executed as part of this transaction
1504    fn contains_ops(&self) -> bool {
1505        !matches!(self.ops, TransactionOps::None)
1506    }
1507}
1508
1509/// A transaction's status code.
1510#[derive(Debug, Clone, Copy)]
1511pub enum TransactionCode {
1512    /// Not currently in a transaction
1513    Idle,
1514    /// Currently in a transaction
1515    InTransaction,
1516    /// Currently in a transaction block which is failed
1517    Failed,
1518}
1519
1520impl From<TransactionCode> for u8 {
1521    fn from(code: TransactionCode) -> Self {
1522        match code {
1523            TransactionCode::Idle => b'I',
1524            TransactionCode::InTransaction => b'T',
1525            TransactionCode::Failed => b'E',
1526        }
1527    }
1528}
1529
1530impl From<TransactionCode> for String {
1531    fn from(code: TransactionCode) -> Self {
1532        char::from(u8::from(code)).to_string()
1533    }
1534}
1535
1536impl From<&TransactionStatus> for TransactionCode {
1537    /// Convert from the Session's version
1538    fn from(status: &TransactionStatus) -> TransactionCode {
1539        match status {
1540            TransactionStatus::Default => TransactionCode::Idle,
1541            TransactionStatus::Started(_) => TransactionCode::InTransaction,
1542            TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1543            TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1544            TransactionStatus::Failed(_) => TransactionCode::Failed,
1545        }
1546    }
1547}
1548
1549/// The type of operation being performed by the transaction.
1550///
1551/// This is needed because we currently do not allow mixing reads and writes in
1552/// a transaction. Use this to record what we have done, and what may need to
1553/// happen at commit.
1554#[derive(Derivative)]
1555#[derivative(Debug)]
1556pub enum TransactionOps {
1557    /// The transaction has been initiated, but no statement has yet been executed
1558    /// in it.
1559    None,
1560    /// This transaction has had a peek (`SELECT`, `SUBSCRIBE`). If the inner value
1561    /// is has a timestamp, it must only do other peeks. However, if it doesn't
1562    /// have a timestamp (i.e. the values are constants), the transaction can still
1563    /// perform writes.
1564    Peeks {
1565        /// The timestamp and timestamp related metadata for the peek.
1566        determination: TimestampDetermination,
1567        /// The cluster used to execute peeks.
1568        cluster_id: ClusterId,
1569        /// Whether this peek needs to be linearized.
1570        requires_linearization: RequireLinearization,
1571    },
1572    /// This transaction has done a `SUBSCRIBE` and must do nothing else.
1573    Subscribe,
1574    /// This transaction has had a write (`INSERT`, `UPDATE`, `DELETE`) and must
1575    /// only do other writes, or reads whose timestamp is None (i.e. constants).
1576    Writes(Vec<WriteOp>),
1577    /// This transaction has a prospective statement that will execute during commit.
1578    SingleStatement {
1579        /// The prospective statement.
1580        stmt: Arc<Statement<Raw>>,
1581        /// The statement params.
1582        params: mz_sql::plan::Params,
1583    },
1584    /// This transaction has run some _simple_ DDL and must do nothing else. Any statement/plan that
1585    /// uses this must return false in `must_serialize_ddl()` because this is serialized instead in
1586    /// `sequence_plan()` during `COMMIT`.
1587    DDL {
1588        /// Catalog operations that have already run, and must run before each subsequent op.
1589        ops: Vec<crate::catalog::Op>,
1590        /// In-memory state that reflects the previously applied ops.
1591        state: CatalogState,
1592        /// A list of side effects that should be executed if this DDL transaction commits.
1593        #[derivative(Debug = "ignore")]
1594        side_effects: Vec<
1595            Box<
1596                dyn for<'a> FnOnce(
1597                        &'a mut Coordinator,
1598                        Option<&'a mut ExecuteContext>,
1599                    ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1600                    + Send
1601                    + Sync,
1602            >,
1603        >,
1604        /// Transient revision of the `Catalog` when this transaction started.
1605        revision: u64,
1606        /// Snapshot of the durable transaction state after the last dry run.
1607        /// Used to initialize the next dry run's transaction so it starts
1608        /// in sync with the accumulated `state`. `None` for the first
1609        /// statement in the transaction (before any dry run).
1610        snapshot: Option<Snapshot>,
1611    },
1612}
1613
1614impl TransactionOps {
1615    fn timestamp_determination(self) -> Option<TimestampDetermination> {
1616        match self {
1617            TransactionOps::Peeks { determination, .. } => Some(determination),
1618            TransactionOps::None
1619            | TransactionOps::Subscribe
1620            | TransactionOps::Writes(_)
1621            | TransactionOps::SingleStatement { .. }
1622            | TransactionOps::DDL { .. } => None,
1623        }
1624    }
1625}
1626
1627impl Default for TransactionOps {
1628    fn default() -> Self {
1629        Self::None
1630    }
1631}
1632
1633/// An `INSERT` waiting to be committed.
1634#[derive(Debug, Clone, PartialEq)]
1635pub struct WriteOp {
1636    /// The target table.
1637    pub id: CatalogItemId,
1638    /// The data rows.
1639    pub rows: TableData,
1640}
1641
1642/// Whether a transaction requires linearization.
1643#[derive(Debug)]
1644pub enum RequireLinearization {
1645    /// Linearization is required.
1646    Required,
1647    /// Linearization is not required.
1648    NotRequired,
1649}
1650
1651impl From<&ExplainContext> for RequireLinearization {
1652    fn from(ctx: &ExplainContext) -> Self {
1653        match ctx {
1654            ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1655                RequireLinearization::Required
1656            }
1657            _ => RequireLinearization::NotRequired,
1658        }
1659    }
1660}
1661
1662/// A complete set of exclusive locks for writing to collections identified by [`CatalogItemId`]s.
1663///
1664/// To prevent deadlocks between two sessions, we do not allow acquiring a partial set of locks.
1665#[derive(Debug)]
1666pub struct WriteLocks {
1667    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1668    /// Connection that currently holds these locks, used for tracing purposes only.
1669    conn_id: ConnectionId,
1670}
1671
1672impl WriteLocks {
1673    /// Create a [`WriteLocksBuilder`] pre-defining all of the locks we need.
1674    ///
1675    /// When "finishing" the builder with [`WriteLocksBuilder::all_or_nothing`], if we haven't
1676    /// acquired all of the necessary locks we drop any partially acquired ones.
1677    pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1678        let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1679        WriteLocksBuilder { locks }
1680    }
1681
1682    /// Validate this set of [`WriteLocks`] is sufficient for the provided collections.
1683    /// Dropping the currently held locks if it's not.
1684    pub fn validate(
1685        self,
1686        collections: impl Iterator<Item = CatalogItemId>,
1687    ) -> Result<Self, BTreeSet<CatalogItemId>> {
1688        let mut missing = BTreeSet::new();
1689        for collection in collections {
1690            if !self.locks.contains_key(&collection) {
1691                missing.insert(collection);
1692            }
1693        }
1694
1695        if missing.is_empty() {
1696            Ok(self)
1697        } else {
1698            // Explicitly drop the already acquired locks.
1699            drop(self);
1700            Err(missing)
1701        }
1702    }
1703}
1704
1705impl Drop for WriteLocks {
1706    fn drop(&mut self) {
1707        // We may have merged the locks into GroupCommitWriteLocks, thus it could be empty.
1708        if !self.locks.is_empty() {
1709            tracing::info!(
1710                conn_id = %self.conn_id,
1711                locks = ?self.locks,
1712                "dropping write locks",
1713            );
1714        }
1715    }
1716}
1717
1718/// A builder struct that helps us acquire all of the locks we need, or none of them.
1719///
1720/// See [`WriteLocks::builder`].
1721#[derive(Debug)]
1722pub struct WriteLocksBuilder {
1723    locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1724}
1725
1726impl WriteLocksBuilder {
1727    /// Adds a lock to this builder.
1728    pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1729        self.locks.insert(id, Some(lock));
1730    }
1731
1732    /// Finish this builder by returning either all of the necessary locks, or none of them.
1733    ///
1734    /// If we fail to acquire all of the locks, returns one of the [`CatalogItemId`]s that we
1735    /// failed to acquire a lock for, that should be awaited so we know when to run again.
1736    pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1737        let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1738            self.locks
1739                .into_iter()
1740                .partition_map(|(gid, lock)| match lock {
1741                    Some(lock) => itertools::Either::Left((gid, lock)),
1742                    None => itertools::Either::Right(gid),
1743                });
1744
1745        match missing.iter().next() {
1746            None => {
1747                tracing::info!(%conn_id, ?locks, "acquired write locks");
1748                Ok(WriteLocks {
1749                    locks,
1750                    conn_id: conn_id.clone(),
1751                })
1752            }
1753            Some(gid) => {
1754                tracing::info!(?missing, "failed to acquire write locks");
1755                // Explicitly drop the already acquired locks.
1756                drop(locks);
1757                Err(*gid)
1758            }
1759        }
1760    }
1761}
1762
1763/// Collection of [`WriteLocks`] gathered during [`group_commit`].
1764///
1765/// Note: This struct should __never__ be used outside of group commit because it attempts to merge
1766/// together several collections of [`WriteLocks`] which if not done carefully can cause deadlocks
1767/// or consistency violations.
1768///
1769/// We must prevent writes from occurring to tables during read then write plans (e.g. `UPDATE`)
1770/// but we can allow blind writes (e.g. `INSERT`) to get committed concurrently at the same
1771/// timestamp when submitting the updates from a read then write plan.
1772///
1773/// Naively it would seem as though we could allow blind writes to occur whenever as blind writes
1774/// could never cause invalid retractions, but it could cause us to violate serializability because
1775/// there is no total order we could define for the transactions. Consider the following scenario:
1776///
1777/// ```text
1778/// table: foo
1779///
1780///  a | b
1781/// --------
1782///  x   2
1783///  y   3
1784///  z   4
1785///
1786/// -- Session(A)
1787/// -- read then write plan, reads at t0, writes at t3, transaction Ta
1788/// DELETE FROM foo WHERE b % 2 = 0;
1789///
1790///
1791/// -- Session(B)
1792/// -- blind write into foo, writes at t1, transaction Tb
1793/// INSERT INTO foo VALUES ('q', 6);
1794/// -- select from foo, reads at t2, transaction Tc
1795/// SELECT * FROM foo;
1796///
1797///
1798/// The times these operations occur at are ordered:
1799/// t0 < t1 < t2 < t3
1800///
1801/// Given the timing of the operations, the transactions must have the following order:
1802///
1803/// * Ta does not observe ('q', 6), so Ta < Tb
1804/// * Tc does observe ('q', 6), so Tb < Tc
1805/// * Tc does not observe the retractions from Ta, so Tc < Ta
1806///
1807/// For total order to exist, Ta < Tb < Tc < Ta, which is impossible.
1808/// ```
1809///
1810/// [`group_commit`]: super::coord::Coordinator::group_commit
1811#[derive(Debug, Default)]
1812pub(crate) struct GroupCommitWriteLocks {
1813    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1814}
1815
1816impl GroupCommitWriteLocks {
1817    /// Merge a set of [`WriteLocks`] into this collection for group commit.
1818    pub fn merge(&mut self, mut locks: WriteLocks) {
1819        // Note: Ideally we would use `.drain`, but that method doesn't exist for BTreeMap.
1820        //
1821        // See: <https://github.com/rust-lang/rust/issues/81074>
1822        let existing = std::mem::take(&mut locks.locks);
1823        self.locks.extend(existing);
1824    }
1825
1826    /// Returns the collections we're missing locks for, if any.
1827    pub fn missing_locks(
1828        &self,
1829        writes: impl Iterator<Item = CatalogItemId>,
1830    ) -> BTreeSet<CatalogItemId> {
1831        let mut missing = BTreeSet::new();
1832        for write in writes {
1833            if !self.locks.contains_key(&write) {
1834                missing.insert(write);
1835            }
1836        }
1837        missing
1838    }
1839}
1840
1841impl Drop for GroupCommitWriteLocks {
1842    fn drop(&mut self) {
1843        if !self.locks.is_empty() {
1844            tracing::info!(
1845                locks = ?self.locks,
1846                "dropping group commit write locks",
1847            );
1848        }
1849    }
1850}