Skip to main content

mz_adapter/
session.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Per-connection configuration parameters and state.
11
12#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use derivative::Derivative;
25use itertools::Itertools;
26use mz_adapter_types::connection::ConnectionId;
27use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
28use mz_controller_types::ClusterId;
29use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_pgwire_common::Format;
32use mz_repr::role_id::RoleId;
33use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
34use mz_repr::{CatalogItemId, Datum, Row, RowIterator, SqlScalarType, TimestampManipulation};
35use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
36use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::user::{
39    INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
40};
41use mz_sql::session::vars::IsolationLevel;
42pub use mz_sql::session::vars::{
43    DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
44    SERVER_PATCH_VERSION, SessionVars, Var,
45};
46use mz_sql_parser::ast::TransactionIsolationLevel;
47use mz_storage_client::client::TableData;
48use mz_storage_types::sources::Timeline;
49use qcell::{QCell, QCellOwner};
50use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
51use tokio::sync::watch;
52use uuid::Uuid;
53
54use crate::catalog::CatalogState;
55use crate::client::RecordFirstRowStream;
56use crate::coord::appends::BuiltinTableAppendNotify;
57use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
58use crate::coord::peek::PeekResponseUnary;
59use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
60use crate::coord::{Coordinator, ExplainContext};
61use crate::error::AdapterError;
62use crate::metrics::{Metrics, SessionMetrics};
63use crate::statement_logging::PreparedStatementLoggingInfo;
64use crate::{AdapterNotice, ExecuteContext};
65
66const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
67
68/// A session holds per-connection state.
69#[derive(Derivative)]
70#[derivative(Debug)]
71pub struct Session<T = mz_repr::Timestamp>
72where
73    T: Debug + Clone + Send + Sync,
74{
75    conn_id: ConnectionId,
76    /// A globally unique identifier for the session. Not to be confused
77    /// with `conn_id`, which may be reused.
78    uuid: Uuid,
79    prepared_statements: BTreeMap<String, PreparedStatement>,
80    portals: BTreeMap<String, Portal>,
81    transaction: TransactionStatus<T>,
82    pcx: Option<PlanContext>,
83    metrics: SessionMetrics,
84    #[derivative(Debug = "ignore")]
85    builtin_updates: Option<BuiltinTableAppendNotify>,
86
87    /// The role metadata of the current session.
88    ///
89    /// Invariant: role_metadata must be `Some` after the user has
90    /// successfully connected to and authenticated with Materialize.
91    ///
92    /// Prefer using this value over [`SessionConfig::user`].
93    //
94    // It would be better for this not to be an Option, but the
95    // `Session` is initialized before the user has connected to
96    // Materialize and is able to look up the `RoleMetadata`. The `Session`
97    // is also used to return an error when no role exists and
98    // therefore there is no valid `RoleMetadata`.
99    role_metadata: Option<RoleMetadata>,
100    client_ip: Option<IpAddr>,
101    vars: SessionVars,
102    notices_tx: mpsc::UnboundedSender<AdapterNotice>,
103    notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
104    next_transaction_id: TransactionId,
105    secret_key: u32,
106    external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
107    // Token allowing us to access `Arc<QCell<StatementLogging>>`
108    // metadata. We want these to be reference-counted, because the same
109    // statement might be referenced from multiple portals simultaneously.
110    //
111    // However, they can't be `Rc<RefCell<StatementLogging>>`, because
112    // the `Session` is sent around to different threads.
113    //
114    // On the other hand, they don't need to be
115    // `Arc<Mutex<StatementLogging>>`, because they will always be
116    // accessed from the same thread that the `Session` is currently
117    // on. We express this by gating access with this token.
118    #[derivative(Debug = "ignore")]
119    qcell_owner: QCellOwner,
120    session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
121    /// Incremented when session state that is relevant to prepared statement planning changes.
122    /// Currently, only changes to `portals` are tracked. Changes to `prepared_statements` don't
123    /// need to be tracked, because prepared statements can't depend on other prepared statements.
124    /// TODO: We might want to track changes also to session variables.
125    /// (`Catalog::transient_revision` similarly tracks changes on the catalog side.)
126    state_revision: u64,
127}
128
129impl<T> SessionMetadata for Session<T>
130where
131    T: Debug + Clone + Send + Sync,
132    T: TimestampManipulation,
133{
134    fn conn_id(&self) -> &ConnectionId {
135        &self.conn_id
136    }
137
138    fn client_ip(&self) -> Option<&IpAddr> {
139        self.client_ip.as_ref()
140    }
141
142    fn pcx(&self) -> &PlanContext {
143        &self
144            .transaction()
145            .inner()
146            .expect("no active transaction")
147            .pcx
148    }
149
150    fn role_metadata(&self) -> &RoleMetadata {
151        self.role_metadata
152            .as_ref()
153            .expect("role_metadata invariant violated")
154    }
155
156    fn vars(&self) -> &SessionVars {
157        &self.vars
158    }
159}
160
161/// Data structure suitable for passing to other threads that need access to some common Session
162/// properties.
163#[derive(Debug)]
164pub struct SessionMeta {
165    conn_id: ConnectionId,
166    client_ip: Option<IpAddr>,
167    pcx: PlanContext,
168    role_metadata: RoleMetadata,
169    vars: SessionVars,
170}
171
172impl SessionMetadata for SessionMeta {
173    fn vars(&self) -> &SessionVars {
174        &self.vars
175    }
176
177    fn conn_id(&self) -> &ConnectionId {
178        &self.conn_id
179    }
180
181    fn client_ip(&self) -> Option<&IpAddr> {
182        self.client_ip.as_ref()
183    }
184
185    fn pcx(&self) -> &PlanContext {
186        &self.pcx
187    }
188
189    fn role_metadata(&self) -> &RoleMetadata {
190        &self.role_metadata
191    }
192}
193
194/// Configures a new [`Session`].
195#[derive(Debug, Clone)]
196pub struct SessionConfig {
197    /// The connection ID for the session.
198    ///
199    /// May be reused after the session terminates.
200    pub conn_id: ConnectionId,
201    /// A universally unique identifier for the session, across all processes,
202    /// region, and all time.
203    ///
204    /// Must not be reused, even after the session terminates.
205    pub uuid: Uuid,
206    /// The peer address of the client
207    pub client_ip: Option<IpAddr>,
208    /// The name of the user associated with the session.
209    pub user: String,
210    /// An optional receiver that the session will periodically check for
211    /// updates to a user's external metadata.
212    pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
213    /// Helm chart version
214    pub helm_chart_version: Option<String>,
215}
216
217impl<T: TimestampManipulation> Session<T> {
218    /// Creates a new session for the specified connection ID.
219    pub(crate) fn new(
220        build_info: &'static BuildInfo,
221        config: SessionConfig,
222        metrics: SessionMetrics,
223    ) -> Session<T> {
224        assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
225        Self::new_internal(build_info, config, metrics)
226    }
227
228    /// Returns a reference-less collection of data usable by other tasks that don't have ownership
229    /// of the Session.
230    pub fn meta(&self) -> SessionMeta {
231        SessionMeta {
232            conn_id: self.conn_id().clone(),
233            client_ip: self.client_ip().copied(),
234            pcx: self.pcx().clone(),
235            role_metadata: self.role_metadata().clone(),
236            vars: self.vars.clone(),
237        }
238
239        // TODO: soft_assert that these are the same as Session.
240    }
241
242    /// Creates new statement logging metadata for a one-off
243    /// statement.
244    // Normally, such logging information would be created as part of
245    // allocating a new prepared statement, and a refcounted handle
246    // would be copied from that prepared statement to portals during
247    // binding. However, we also support (via `Command::declare`)
248    // binding a statement directly to a portal without creating an
249    // intermediate prepared statement. Thus, for those cases, a
250    // mechanism for generating the logging metadata directly is needed.
251    pub(crate) fn mint_logging<A: AstInfo>(
252        &self,
253        raw_sql: String,
254        stmt: Option<&Statement<A>>,
255        now: EpochMillis,
256    ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
257        Arc::new(QCell::new(
258            &self.qcell_owner,
259            PreparedStatementLoggingInfo::still_to_log(
260                raw_sql,
261                stmt,
262                now,
263                "".to_string(),
264                self.uuid,
265                false,
266            ),
267        ))
268    }
269
270    pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
271        self.qcell_owner.ro(&*cell)
272    }
273
274    pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
275        self.qcell_owner.rw(&*cell)
276    }
277
278    /// Returns a unique ID for the session.
279    /// Not to be confused with `connection_id`, which can be reused.
280    pub fn uuid(&self) -> Uuid {
281        self.uuid
282    }
283
284    /// Creates a new dummy session.
285    ///
286    /// Dummy sessions are intended for use when executing queries on behalf of
287    /// the system itself, rather than on behalf of a user.
288    pub fn dummy() -> Session<T> {
289        let registry = MetricsRegistry::new();
290        let metrics = Metrics::register_into(&registry);
291        let metrics = metrics.session_metrics();
292        let mut dummy = Self::new_internal(
293            &DUMMY_BUILD_INFO,
294            SessionConfig {
295                conn_id: DUMMY_CONNECTION_ID,
296                uuid: Uuid::new_v4(),
297                user: SYSTEM_USER.name.clone(),
298                client_ip: None,
299                external_metadata_rx: None,
300                helm_chart_version: None,
301            },
302            metrics,
303        );
304        dummy.initialize_role_metadata(RoleId::User(0));
305        dummy
306    }
307
308    fn new_internal(
309        build_info: &'static BuildInfo,
310        SessionConfig {
311            conn_id,
312            uuid,
313            user,
314            client_ip,
315            mut external_metadata_rx,
316            helm_chart_version,
317        }: SessionConfig,
318        metrics: SessionMetrics,
319    ) -> Session<T> {
320        let (notices_tx, notices_rx) = mpsc::unbounded_channel();
321        let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
322        let user = User {
323            name: user,
324            internal_metadata: None,
325            external_metadata: external_metadata_rx
326                .as_mut()
327                .map(|rx| rx.borrow_and_update().clone()),
328        };
329        let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
330        if let Some(default_cluster) = default_cluster {
331            vars.set_cluster(default_cluster.clone());
332        }
333        Session {
334            conn_id,
335            uuid,
336            transaction: TransactionStatus::Default,
337            pcx: None,
338            metrics,
339            builtin_updates: None,
340            prepared_statements: BTreeMap::new(),
341            portals: BTreeMap::new(),
342            role_metadata: None,
343            client_ip,
344            vars,
345            notices_tx,
346            notices_rx,
347            next_transaction_id: 0,
348            secret_key: rand::random(),
349            external_metadata_rx,
350            qcell_owner: QCellOwner::new(),
351            session_oracles: BTreeMap::new(),
352            state_revision: 0,
353        }
354    }
355
356    /// Returns the secret key associated with the session.
357    pub fn secret_key(&self) -> u32 {
358        self.secret_key
359    }
360
361    fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
362        if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
363            wall_time = *mock_time;
364        }
365        PlanContext::new(wall_time)
366    }
367
368    /// Starts an explicit transaction, or changes an implicit to an explicit
369    /// transaction.
370    pub fn start_transaction(
371        &mut self,
372        wall_time: DateTime<Utc>,
373        access: Option<TransactionAccessMode>,
374        isolation_level: Option<TransactionIsolationLevel>,
375    ) -> Result<(), AdapterError> {
376        // Check that current transaction state is compatible with new `access`
377        if let Some(txn) = self.transaction.inner() {
378            // `READ WRITE` prohibited if:
379            // - Currently in `READ ONLY`
380            // - Already performed a query
381            let read_write_prohibited = match txn.ops {
382                TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
383                    txn.access == Some(TransactionAccessMode::ReadOnly)
384                }
385                TransactionOps::None
386                | TransactionOps::Writes(_)
387                | TransactionOps::SingleStatement { .. }
388                | TransactionOps::DDL { .. } => false,
389            };
390
391            if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
392                return Err(AdapterError::ReadWriteUnavailable);
393            }
394        }
395
396        match std::mem::take(&mut self.transaction) {
397            TransactionStatus::Default => {
398                let id = self.next_transaction_id;
399                self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
400                self.transaction = TransactionStatus::InTransaction(Transaction {
401                    pcx: self.new_pcx(wall_time),
402                    ops: TransactionOps::None,
403                    write_lock_guards: None,
404                    access,
405                    id,
406                });
407            }
408            TransactionStatus::Started(mut txn)
409            | TransactionStatus::InTransactionImplicit(mut txn)
410            | TransactionStatus::InTransaction(mut txn) => {
411                if access.is_some() {
412                    txn.access = access;
413                }
414                self.transaction = TransactionStatus::InTransaction(txn);
415            }
416            TransactionStatus::Failed(_) => unreachable!(),
417        };
418
419        if let Some(isolation_level) = isolation_level {
420            self.vars
421                .set_local_transaction_isolation(isolation_level.into());
422        }
423
424        Ok(())
425    }
426
427    /// Starts either a single statement or implicit transaction based on the
428    /// number of statements, but only if no transaction has been started already.
429    pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
430        if let TransactionStatus::Default = self.transaction {
431            let id = self.next_transaction_id;
432            self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
433            let txn = Transaction {
434                pcx: self.new_pcx(wall_time),
435                ops: TransactionOps::None,
436                write_lock_guards: None,
437                access: None,
438                id,
439            };
440            match stmts {
441                1 => self.transaction = TransactionStatus::Started(txn),
442                n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
443                _ => {}
444            }
445        }
446    }
447
448    /// Starts a single statement transaction, but only if no transaction has been started already.
449    pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
450        self.start_transaction_implicit(wall_time, 1);
451    }
452
453    /// Clears a transaction, setting its state to Default and destroying all
454    /// portals. Returned are:
455    /// - sinks that were started in this transaction and need to be dropped
456    /// - the cleared transaction so its operations can be handled
457    ///
458    /// The [Postgres protocol docs](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) specify:
459    /// > a named portal object lasts till the end of the current transaction
460    /// and
461    /// > An unnamed portal is destroyed at the end of the transaction
462    #[must_use]
463    pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
464        self.portals.clear();
465        self.pcx = None;
466        self.state_revision += 1;
467        mem::take(&mut self.transaction)
468    }
469
470    /// Marks the current transaction as failed.
471    pub fn fail_transaction(mut self) -> Self {
472        match self.transaction {
473            TransactionStatus::Default => unreachable!(),
474            TransactionStatus::Started(txn)
475            | TransactionStatus::InTransactionImplicit(txn)
476            | TransactionStatus::InTransaction(txn) => {
477                self.transaction = TransactionStatus::Failed(txn);
478            }
479            TransactionStatus::Failed(_) => {}
480        };
481        self
482    }
483
484    /// Returns the current transaction status.
485    pub fn transaction(&self) -> &TransactionStatus<T> {
486        &self.transaction
487    }
488
489    /// Returns the current transaction status.
490    pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
491        &mut self.transaction
492    }
493
494    /// Returns the session's transaction code.
495    pub fn transaction_code(&self) -> TransactionCode {
496        self.transaction().into()
497    }
498
499    /// Adds operations to the current transaction. An error is produced if
500    /// they cannot be merged (i.e., a timestamp-dependent read cannot be
501    /// merged to an insert).
502    pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
503        self.transaction.add_ops(add_ops)
504    }
505
506    /// Returns a channel on which to send notices to the session.
507    pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
508        self.notices_tx.clone()
509    }
510
511    /// Adds a notice to the session.
512    pub fn add_notice(&self, notice: AdapterNotice) {
513        self.add_notices([notice])
514    }
515
516    /// Adds multiple notices to the session.
517    pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
518        for notice in notices {
519            let _ = self.notices_tx.send(notice);
520        }
521    }
522
523    /// Awaits a possible notice.
524    ///
525    /// This method is cancel safe.
526    pub async fn recv_notice(&mut self) -> AdapterNotice {
527        // This method is cancel safe because recv is cancel safe.
528        loop {
529            let notice = self
530                .notices_rx
531                .recv()
532                .await
533                .expect("Session also holds a sender, so recv won't ever return None");
534            match self.notice_filter(notice) {
535                Some(notice) => return notice,
536                None => continue,
537            }
538        }
539    }
540
541    /// Returns a draining iterator over the notices attached to the session.
542    pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
543        let mut notices = Vec::new();
544        while let Ok(notice) = self.notices_rx.try_recv() {
545            if let Some(notice) = self.notice_filter(notice) {
546                notices.push(notice);
547            }
548        }
549        notices
550    }
551
552    /// Returns Some if the notice should be reported, otherwise None.
553    fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
554        // Filter out low threshold severity.
555        let minimum_client_severity = self.vars.client_min_messages();
556        let sev = notice.severity();
557        if !minimum_client_severity.should_output_to_client(&sev) {
558            return None;
559        }
560        // Filter out notices for other clusters.
561        if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = &notice {
562            if cluster != self.vars.cluster() {
563                return None;
564            }
565        }
566        Some(notice)
567    }
568
569    /// Sets the transaction ops to `TransactionOps::None`. Must only be used after
570    /// verifying that no transaction anomalies will occur if cleared.
571    pub fn clear_transaction_ops(&mut self) {
572        if let Some(txn) = self.transaction.inner_mut() {
573            txn.ops = TransactionOps::None;
574        }
575    }
576
577    /// If the current transaction ops belong to a read, then sets the
578    /// ops to `None`, returning the old read timestamp context if
579    /// any existed. Must only be used after verifying that no transaction
580    /// anomalies will occur if cleared.
581    pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
582        if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
583            if let TransactionOps::Peeks { .. } = ops {
584                let ops = std::mem::take(ops);
585                Some(
586                    ops.timestamp_determination()
587                        .expect("checked above")
588                        .timestamp_context,
589                )
590            } else {
591                None
592            }
593        } else {
594            None
595        }
596    }
597
598    /// Returns the transaction's read timestamp determination, if set.
599    ///
600    /// Returns `None` if there is no active transaction, or if the active
601    /// transaction is not a read transaction.
602    pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
603        match self.transaction.inner() {
604            Some(Transaction {
605                pcx: _,
606                ops: TransactionOps::Peeks { determination, .. },
607                write_lock_guards: _,
608                access: _,
609                id: _,
610            }) => Some(determination.clone()),
611            _ => None,
612        }
613    }
614
615    /// Whether this session has a timestamp for a read transaction.
616    pub fn contains_read_timestamp(&self) -> bool {
617        matches!(
618            self.transaction.inner(),
619            Some(Transaction {
620                pcx: _,
621                ops: TransactionOps::Peeks {
622                    determination: TimestampDetermination {
623                        timestamp_context: TimestampContext::TimelineTimestamp { .. },
624                        ..
625                    },
626                    ..
627                },
628                write_lock_guards: _,
629                access: _,
630                id: _,
631            })
632        )
633    }
634
635    /// Registers the prepared statement under `name`.
636    pub fn set_prepared_statement(
637        &mut self,
638        name: String,
639        stmt: Option<Statement<Raw>>,
640        raw_sql: String,
641        desc: StatementDesc,
642        state_revision: StateRevision,
643        now: EpochMillis,
644    ) {
645        let logging = PreparedStatementLoggingInfo::still_to_log(
646            raw_sql,
647            stmt.as_ref(),
648            now,
649            name.clone(),
650            self.uuid,
651            false,
652        );
653        let statement = PreparedStatement {
654            stmt,
655            desc,
656            state_revision,
657            logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
658        };
659        self.prepared_statements.insert(name, statement);
660    }
661
662    /// Removes the prepared statement associated with `name`.
663    ///
664    /// Returns whether a statement previously existed.
665    pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
666        self.prepared_statements.remove(name).is_some()
667    }
668
669    /// Removes all prepared statements.
670    pub fn remove_all_prepared_statements(&mut self) {
671        self.prepared_statements.clear();
672    }
673
674    /// Retrieves the prepared statement associated with `name`.
675    ///
676    /// This is unverified and could be incorrect if the underlying catalog has
677    /// changed.
678    pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
679        self.prepared_statements.get(name)
680    }
681
682    /// Retrieves the prepared statement associated with `name`.
683    ///
684    /// This is unverified and could be incorrect if the underlying catalog has
685    /// changed.
686    pub fn get_prepared_statement_mut_unverified(
687        &mut self,
688        name: &str,
689    ) -> Option<&mut PreparedStatement> {
690        self.prepared_statements.get_mut(name)
691    }
692
693    /// Returns the prepared statements for the session.
694    pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
695        &self.prepared_statements
696    }
697
698    /// Returns the portals for the session.
699    pub fn portals(&self) -> &BTreeMap<String, Portal> {
700        &self.portals
701    }
702
703    /// Binds the specified portal to the specified prepared statement.
704    ///
705    /// If the prepared statement contains parameters, the values and types of
706    /// those parameters must be provided in `params`. It is the caller's
707    /// responsibility to ensure that the correct number of parameters is
708    /// provided.
709    ///
710    /// The `results_formats` parameter sets the desired format of the results,
711    /// and is stored on the portal.
712    pub fn set_portal(
713        &mut self,
714        portal_name: String,
715        desc: StatementDesc,
716        stmt: Option<Statement<Raw>>,
717        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
718        params: Vec<(Datum, SqlScalarType)>,
719        result_formats: Vec<Format>,
720        state_revision: StateRevision,
721    ) -> Result<(), AdapterError> {
722        // The empty portal can be silently replaced.
723        if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
724            return Err(AdapterError::DuplicateCursor(portal_name));
725        }
726        self.state_revision += 1;
727        let param_types = desc.param_types.clone();
728        self.portals.insert(
729            portal_name,
730            Portal {
731                stmt: stmt.map(Arc::new),
732                desc,
733                state_revision,
734                parameters: Params {
735                    datums: Row::pack(params.iter().map(|(d, _t)| d)),
736                    execute_types: params.into_iter().map(|(_d, t)| t).collect(),
737                    expected_types: param_types,
738                },
739                result_formats,
740                state: PortalState::NotStarted,
741                logging,
742                lifecycle_timestamps: None,
743            },
744        );
745        Ok(())
746    }
747
748    /// Removes the specified portal.
749    ///
750    /// If there is no such portal, this method does nothing. Returns whether that portal existed.
751    pub fn remove_portal(&mut self, portal_name: &str) -> bool {
752        self.state_revision += 1;
753        self.portals.remove(portal_name).is_some()
754    }
755
756    /// Retrieves a reference to the specified portal.
757    ///
758    /// If there is no such portal, returns `None`.
759    pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
760        self.portals.get(portal_name)
761    }
762
763    /// Retrieves a mutable reference to the specified portal.
764    ///
765    /// If there is no such portal, returns `None`.
766    ///
767    /// Note: When using the returned `PortalRefMut`, there is no need to increment
768    /// `Session::state_revision`, because the portal's meaning is not changed.
769    pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
770        self.portals.get_mut(portal_name).map(|p| PortalRefMut {
771            stmt: &p.stmt,
772            desc: &p.desc,
773            state_revision: &mut p.state_revision,
774            parameters: &mut p.parameters,
775            result_formats: &mut p.result_formats,
776            logging: &mut p.logging,
777            state: &mut p.state,
778            lifecycle_timestamps: &mut p.lifecycle_timestamps,
779        })
780    }
781
782    /// Creates and installs a new portal.
783    pub fn create_new_portal(
784        &mut self,
785        stmt: Option<Statement<Raw>>,
786        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
787        desc: StatementDesc,
788        parameters: Params,
789        result_formats: Vec<Format>,
790        state_revision: StateRevision,
791    ) -> Result<String, AdapterError> {
792        self.state_revision += 1;
793
794        // See: https://github.com/postgres/postgres/blob/84f5c2908dad81e8622b0406beea580e40bb03ac/src/backend/utils/mmgr/portalmem.c#L234
795        for i in 0usize.. {
796            let name = format!("<unnamed portal {}>", i);
797            match self.portals.entry(name.clone()) {
798                Entry::Occupied(_) => continue,
799                Entry::Vacant(entry) => {
800                    entry.insert(Portal {
801                        stmt: stmt.map(Arc::new),
802                        desc,
803                        state_revision,
804                        parameters,
805                        result_formats,
806                        state: PortalState::NotStarted,
807                        logging,
808                        lifecycle_timestamps: None,
809                    });
810                    return Ok(name);
811                }
812            }
813        }
814
815        coord_bail!("unable to create a new portal");
816    }
817
818    /// Resets the session to its initial state. Returns sinks that need to be
819    /// dropped.
820    pub fn reset(&mut self) {
821        let _ = self.clear_transaction();
822        self.prepared_statements.clear();
823        self.vars.reset_all();
824    }
825
826    /// Returns the [application_name] that created this session.
827    ///
828    /// [application_name]: (https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-APPLICATION-NAME)
829    pub fn application_name(&self) -> &str {
830        self.vars.application_name()
831    }
832
833    /// Returns a reference to the variables in this session.
834    pub fn vars(&self) -> &SessionVars {
835        &self.vars
836    }
837
838    /// Returns a mutable reference to the variables in this session.
839    pub fn vars_mut(&mut self) -> &mut SessionVars {
840        &mut self.vars
841    }
842
843    /// Grants a set of write locks to this session's inner [`Transaction`].
844    ///
845    /// # Panics
846    /// If the inner transaction is idle. See [`TransactionStatus::try_grant_write_locks`].
847    ///
848    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
849        self.transaction.try_grant_write_locks(guards)
850    }
851
852    /// Drains any external metadata updates and applies the changes from the latest update.
853    pub fn apply_external_metadata_updates(&mut self) {
854        // If no sender is registered then there isn't anything to do.
855        let Some(rx) = &mut self.external_metadata_rx else {
856            return;
857        };
858
859        // If the value hasn't changed then return.
860        if !rx.has_changed().unwrap_or(false) {
861            return;
862        }
863
864        // Update our metadata! Note the short critical section (just a clone) to avoid blocking
865        // the sending side of this watch channel.
866        let metadata = rx.borrow_and_update().clone();
867        self.vars.set_external_user_metadata(metadata);
868    }
869
870    /// Applies the internal user metadata to the session.
871    pub fn apply_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
872        self.vars.set_internal_user_metadata(metadata);
873    }
874
875    /// Initializes the session's role metadata.
876    pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
877        self.role_metadata = Some(RoleMetadata::new(role_id));
878    }
879
880    /// Ensures that a timestamp oracle exists for `timeline` and returns a mutable reference to
881    /// the timestamp oracle.
882    pub fn ensure_timestamp_oracle(
883        &mut self,
884        timeline: Timeline,
885    ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
886        self.session_oracles
887            .entry(timeline)
888            .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
889    }
890
891    /// Ensures that a timestamp oracle exists for reads and writes from/to a local input and
892    /// returns a mutable reference to the timestamp oracle.
893    pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
894        self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
895    }
896
897    /// Returns a reference to the timestamp oracle for `timeline`.
898    pub fn get_timestamp_oracle(
899        &self,
900        timeline: &Timeline,
901    ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
902        self.session_oracles.get(timeline)
903    }
904
905    /// If the current session is using the Strong Session Serializable isolation level advance the
906    /// session local timestamp oracle to `write_ts`.
907    pub fn apply_write(&mut self, timestamp: T) {
908        if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
909            self.ensure_local_timestamp_oracle().apply_write(timestamp);
910        }
911    }
912
913    /// Returns the [`SessionMetrics`] instance associated with this [`Session`].
914    pub fn metrics(&self) -> &SessionMetrics {
915        &self.metrics
916    }
917
918    /// Sets the `BuiltinTableAppendNotify` for this session.
919    pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
920        let prev = self.builtin_updates.replace(fut);
921        mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
922    }
923
924    /// Takes the stashed `BuiltinTableAppendNotify`, if one exists, and returns a [`Future`] that
925    /// waits for the writes to complete.
926    pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
927        if let Some(fut) = self.builtin_updates.take() {
928            // Record how long we blocked for, if we blocked at all.
929            let histogram = self
930                .metrics()
931                .session_startup_table_writes_seconds()
932                .clone();
933            Some(async move {
934                fut.wall_time().observe(histogram).await;
935            })
936        } else {
937            None
938        }
939    }
940
941    /// Return the state_revision of the session, which can be used by dependent objects for knowing
942    /// when to re-plan due to session state changes.
943    pub fn state_revision(&self) -> u64 {
944        self.state_revision
945    }
946}
947
948/// A prepared statement.
949#[derive(Derivative, Clone)]
950#[derivative(Debug)]
951pub struct PreparedStatement {
952    stmt: Option<Statement<Raw>>,
953    desc: StatementDesc,
954    /// The most recent state revision that has verified this statement.
955    pub state_revision: StateRevision,
956    #[derivative(Debug = "ignore")]
957    logging: Arc<QCell<PreparedStatementLoggingInfo>>,
958}
959
960impl PreparedStatement {
961    /// Returns the AST associated with this prepared statement,
962    /// if the prepared statement was not the empty query.
963    pub fn stmt(&self) -> Option<&Statement<Raw>> {
964        self.stmt.as_ref()
965    }
966
967    /// Returns the description of the prepared statement.
968    pub fn desc(&self) -> &StatementDesc {
969        &self.desc
970    }
971
972    /// Returns a handle to the metadata for statement logging.
973    pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
974        &self.logging
975    }
976}
977
978/// A portal represents the execution state of a running or runnable query.
979#[derive(Derivative)]
980#[derivative(Debug)]
981pub struct Portal {
982    /// The statement that is bound to this portal.
983    pub stmt: Option<Arc<Statement<Raw>>>,
984    /// The statement description.
985    pub desc: StatementDesc,
986    /// The most recent state revision that has verified this portal.
987    pub state_revision: StateRevision,
988    /// The bound values for the parameters in the prepared statement, if any.
989    pub parameters: Params,
990    /// The desired output format for each column in the result set.
991    pub result_formats: Vec<Format>,
992    /// A handle to metadata needed for statement logging.
993    #[derivative(Debug = "ignore")]
994    pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
995    /// The execution state of the portal.
996    #[derivative(Debug = "ignore")]
997    pub state: PortalState,
998    /// Statement lifecycle timestamps coming from `mz-pgwire`.
999    pub lifecycle_timestamps: Option<LifecycleTimestamps>,
1000}
1001
1002/// A mutable reference to a portal, capturing its state and associated metadata. Importantly, it
1003/// does _not_ give _mutable_ access to `stmt` and `desc`, which means that you do not need to
1004/// increment `Session::state_revision` when modifying fields through a `PortalRefMut`, because the
1005/// portal's meaning is not changed.
1006pub struct PortalRefMut<'a> {
1007    /// The statement that is bound to this portal.
1008    pub stmt: &'a Option<Arc<Statement<Raw>>>,
1009    /// The statement description.
1010    pub desc: &'a StatementDesc,
1011    /// The most recent state revision that has verified this portal.
1012    pub state_revision: &'a mut StateRevision,
1013    /// The bound values for the parameters in the prepared statement, if any.
1014    pub parameters: &'a mut Params,
1015    /// The desired output format for each column in the result set.
1016    pub result_formats: &'a mut Vec<Format>,
1017    /// A handle to metadata needed for statement logging.
1018    pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1019    /// The execution state of the portal.
1020    pub state: &'a mut PortalState,
1021    /// Statement lifecycle timestamps coming from `mz-pgwire`.
1022    pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1023}
1024
1025/// Points to a revision of catalog state and session state. When the current revisions are not the
1026/// same as the revisions when a prepared statement or a portal was described, we need to check
1027/// whether the description is still valid.
1028#[derive(Debug, Clone, Copy, PartialEq)]
1029pub struct StateRevision {
1030    /// A revision of the catalog.
1031    pub catalog_revision: u64,
1032    /// A revision of the session state.
1033    pub session_state_revision: u64,
1034}
1035
1036/// Execution states of a portal.
1037pub enum PortalState {
1038    /// Portal not yet started.
1039    NotStarted,
1040    /// Portal is a rows-returning statement in progress with 0 or more rows
1041    /// remaining.
1042    InProgress(Option<InProgressRows>),
1043    /// Portal has completed and should not be re-executed. If the optional string
1044    /// is present, it is returned as a CommandComplete tag, otherwise an error
1045    /// is sent.
1046    Completed(Option<String>),
1047}
1048
1049/// State of an in-progress, rows-returning portal.
1050pub struct InProgressRows {
1051    /// The current batch of rows.
1052    pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1053    /// A stream from which to fetch more row batches.
1054    pub remaining: RecordFirstRowStream,
1055}
1056
1057impl InProgressRows {
1058    /// Creates a new InProgressRows from a batch stream.
1059    pub fn new(remaining: RecordFirstRowStream) -> Self {
1060        Self {
1061            current: None,
1062            remaining,
1063        }
1064    }
1065
1066    /// Determines whether the underlying stream has ended and there are also no more rows
1067    /// stashed in `current`.
1068    pub fn no_more_rows(&self) -> bool {
1069        self.remaining.no_more_rows && self.current.is_none()
1070    }
1071}
1072
1073/// A channel of batched rows.
1074pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1075
1076/// Part of statement lifecycle. These are timestamps that come from the Adapter frontend
1077/// (`mz-pgwire`) part of the lifecycle.
1078#[derive(Debug, Clone)]
1079pub struct LifecycleTimestamps {
1080    /// When the query was received. More specifically, when the tokio recv returned.
1081    /// For a Simple Query, this is for the whole query, for the Extended Query flow, this is only
1082    /// for `FrontendMessage::Execute`. (This means that this is after parsing for the
1083    /// Extended Query flow.)
1084    pub received: EpochMillis,
1085}
1086
1087impl LifecycleTimestamps {
1088    /// Creates a new `LifecycleTimestamps`.
1089    pub fn new(received: EpochMillis) -> Self {
1090        Self { received }
1091    }
1092}
1093
1094/// The transaction status of a session.
1095///
1096/// PostgreSQL's transaction states are in backend/access/transam/xact.c.
1097#[derive(Debug)]
1098pub enum TransactionStatus<T> {
1099    /// Idle. Matches `TBLOCK_DEFAULT`.
1100    Default,
1101    /// Running a single-query transaction. Matches
1102    /// `TBLOCK_STARTED`. In PostgreSQL, when using the extended query protocol, this
1103    /// may be upgraded into multi-statement implicit query (see [`Self::InTransactionImplicit`]).
1104    /// Additionally, some statements may trigger an eager commit of the implicit transaction,
1105    /// see: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>. In
1106    /// Materialize however, we eagerly commit all statements outside of an explicit transaction
1107    /// when using the extended query protocol. Therefore, we can guarantee that this state will
1108    /// always be a single-query transaction and never be upgraded into a multi-statement implicit
1109    /// query.
1110    Started(Transaction<T>),
1111    /// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
1112    InTransaction(Transaction<T>),
1113    /// Currently in an implicit transaction started from a multi-statement query
1114    /// with more than 1 statements. Matches `TBLOCK_IMPLICIT_INPROGRESS`.
1115    InTransactionImplicit(Transaction<T>),
1116    /// In a failed transaction. Matches `TBLOCK_ABORT`.
1117    Failed(Transaction<T>),
1118}
1119
1120impl<T: TimestampManipulation> TransactionStatus<T> {
1121    /// Extracts the inner transaction ops and write lock guard if not failed.
1122    pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1123        match self {
1124            TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1125            TransactionStatus::Started(txn)
1126            | TransactionStatus::InTransaction(txn)
1127            | TransactionStatus::InTransactionImplicit(txn) => {
1128                (Some(txn.ops), txn.write_lock_guards)
1129            }
1130        }
1131    }
1132
1133    /// Exposes the inner transaction.
1134    pub fn inner(&self) -> Option<&Transaction<T>> {
1135        match self {
1136            TransactionStatus::Default => None,
1137            TransactionStatus::Started(txn)
1138            | TransactionStatus::InTransaction(txn)
1139            | TransactionStatus::InTransactionImplicit(txn)
1140            | TransactionStatus::Failed(txn) => Some(txn),
1141        }
1142    }
1143
1144    /// Exposes the inner transaction.
1145    pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1146        match self {
1147            TransactionStatus::Default => None,
1148            TransactionStatus::Started(txn)
1149            | TransactionStatus::InTransaction(txn)
1150            | TransactionStatus::InTransactionImplicit(txn)
1151            | TransactionStatus::Failed(txn) => Some(txn),
1152        }
1153    }
1154
1155    /// Whether the transaction's ops are DDL.
1156    pub fn is_ddl(&self) -> bool {
1157        match self {
1158            TransactionStatus::Default => false,
1159            TransactionStatus::Started(txn)
1160            | TransactionStatus::InTransaction(txn)
1161            | TransactionStatus::InTransactionImplicit(txn)
1162            | TransactionStatus::Failed(txn) => {
1163                matches!(txn.ops, TransactionOps::DDL { .. })
1164            }
1165        }
1166    }
1167
1168    /// Expresses whether or not the transaction was implicitly started.
1169    /// However, its negation does not imply explicitly started.
1170    pub fn is_implicit(&self) -> bool {
1171        match self {
1172            TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1173            TransactionStatus::Default
1174            | TransactionStatus::InTransaction(_)
1175            | TransactionStatus::Failed(_) => false,
1176        }
1177    }
1178
1179    /// Whether the transaction may contain multiple statements.
1180    pub fn is_in_multi_statement_transaction(&self) -> bool {
1181        match self {
1182            TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1183                true
1184            }
1185            TransactionStatus::Default
1186            | TransactionStatus::Started(_)
1187            | TransactionStatus::Failed(_) => false,
1188        }
1189    }
1190
1191    /// Whether we are in a multi-statement transaction, AND the query is immediate.
1192    pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1193        self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1194    }
1195
1196    /// Grants the writes lock to the inner transaction, returning an error if the transaction
1197    /// has already been granted write locks.
1198    ///
1199    /// # Panics
1200    /// If `self` is `TransactionStatus::Default`, which indicates that the
1201    /// transaction is idle, which is not appropriate to assign the
1202    /// coordinator's write lock to.
1203    ///
1204    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1205        match self {
1206            TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1207            TransactionStatus::Started(txn)
1208            | TransactionStatus::InTransaction(txn)
1209            | TransactionStatus::InTransactionImplicit(txn)
1210            | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1211        }
1212    }
1213
1214    /// Returns the currently held [`WriteLocks`], if this transaction holds any.
1215    pub fn write_locks(&self) -> Option<&WriteLocks> {
1216        match self {
1217            TransactionStatus::Default => None,
1218            TransactionStatus::Started(txn)
1219            | TransactionStatus::InTransaction(txn)
1220            | TransactionStatus::InTransactionImplicit(txn)
1221            | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1222        }
1223    }
1224
1225    /// The timeline of the transaction, if one exists.
1226    pub fn timeline(&self) -> Option<Timeline> {
1227        match self {
1228            TransactionStatus::Default => None,
1229            TransactionStatus::Started(txn)
1230            | TransactionStatus::InTransaction(txn)
1231            | TransactionStatus::InTransactionImplicit(txn)
1232            | TransactionStatus::Failed(txn) => txn.timeline(),
1233        }
1234    }
1235
1236    /// The cluster of the transaction, if one exists.
1237    pub fn cluster(&self) -> Option<ClusterId> {
1238        match self {
1239            TransactionStatus::Default => None,
1240            TransactionStatus::Started(txn)
1241            | TransactionStatus::InTransaction(txn)
1242            | TransactionStatus::InTransactionImplicit(txn)
1243            | TransactionStatus::Failed(txn) => txn.cluster(),
1244        }
1245    }
1246
1247    /// Snapshot of the catalog that reflects DDL operations run in this transaction.
1248    pub fn catalog_state(&self) -> Option<&CatalogState> {
1249        match self.inner() {
1250            Some(Transaction {
1251                ops: TransactionOps::DDL { state, .. },
1252                ..
1253            }) => Some(state),
1254            _ => None,
1255        }
1256    }
1257
1258    /// Reports whether any operations have been executed as part of this transaction
1259    pub fn contains_ops(&self) -> bool {
1260        match self.inner() {
1261            Some(txn) => txn.contains_ops(),
1262            None => false,
1263        }
1264    }
1265
1266    /// Checks whether the current state of this transaction allows writes
1267    /// (adding write ops).
1268    /// transaction
1269    pub fn allows_writes(&self) -> bool {
1270        match self {
1271            TransactionStatus::Started(Transaction { ops, access, .. })
1272            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1273            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1274                match ops {
1275                    TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1276                    TransactionOps::Peeks { determination, .. } => {
1277                        // If-and-only-if peeks thus far do not have a timestamp
1278                        // (i.e. they are constant), we can switch to a write
1279                        // transaction.
1280                        !determination.timestamp_context.contains_timestamp()
1281                    }
1282                    TransactionOps::Subscribe => false,
1283                    TransactionOps::Writes(_) => true,
1284                    TransactionOps::SingleStatement { .. } => false,
1285                    TransactionOps::DDL { .. } => false,
1286                }
1287            }
1288            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1289                unreachable!()
1290            }
1291        }
1292    }
1293
1294    /// Adds operations to the current transaction. An error is produced if they cannot be merged
1295    /// (i.e., a timestamp-dependent read cannot be merged to an insert).
1296    ///
1297    /// The `DDL` variant is an exception and does not merge operations, but instead overwrites the
1298    /// old ops with the new ops. This is correct because it is only used in conjunction with the
1299    /// Dry Run catalog op which returns an error containing all of the ops, and those ops are
1300    /// passed to this function which then overwrites.
1301    ///
1302    /// # Panics
1303    /// If the operations are compatible but the operation metadata doesn't match. Such as reads at
1304    /// different timestamps, reads on different timelines, reads on different clusters, etc. It's
1305    /// up to the caller to make sure these are aligned.
1306    pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1307        match self {
1308            TransactionStatus::Started(Transaction { ops, access, .. })
1309            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1310            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1311                match ops {
1312                    TransactionOps::None => {
1313                        if matches!(access, Some(TransactionAccessMode::ReadOnly))
1314                            && matches!(add_ops, TransactionOps::Writes(_))
1315                        {
1316                            return Err(AdapterError::ReadOnlyTransaction);
1317                        }
1318                        *ops = add_ops;
1319                    }
1320                    TransactionOps::Peeks {
1321                        determination,
1322                        cluster_id,
1323                        requires_linearization,
1324                    } => match add_ops {
1325                        TransactionOps::Peeks {
1326                            determination: add_timestamp_determination,
1327                            cluster_id: add_cluster_id,
1328                            requires_linearization: add_requires_linearization,
1329                        } => {
1330                            assert_eq!(*cluster_id, add_cluster_id);
1331                            match (
1332                                &determination.timestamp_context,
1333                                &add_timestamp_determination.timestamp_context,
1334                            ) {
1335                                (
1336                                    TimestampContext::TimelineTimestamp {
1337                                        timeline: txn_timeline,
1338                                        chosen_ts: txn_ts,
1339                                        oracle_ts: _,
1340                                    },
1341                                    TimestampContext::TimelineTimestamp {
1342                                        timeline: add_timeline,
1343                                        chosen_ts: add_ts,
1344                                        oracle_ts: _,
1345                                    },
1346                                ) => {
1347                                    assert_eq!(txn_timeline, add_timeline);
1348                                    assert_eq!(txn_ts, add_ts);
1349                                }
1350                                (TimestampContext::NoTimestamp, _) => {
1351                                    *determination = add_timestamp_determination
1352                                }
1353                                (_, TimestampContext::NoTimestamp) => {}
1354                            };
1355                            if matches!(requires_linearization, RequireLinearization::NotRequired)
1356                                && matches!(
1357                                    add_requires_linearization,
1358                                    RequireLinearization::Required
1359                                )
1360                            {
1361                                *requires_linearization = add_requires_linearization;
1362                            }
1363                        }
1364                        // Iff peeks thus far do not have a timestamp (i.e.
1365                        // they are constant), we can switch to a write
1366                        // transaction.
1367                        writes @ TransactionOps::Writes(..)
1368                            if !determination.timestamp_context.contains_timestamp() =>
1369                        {
1370                            *ops = writes;
1371                        }
1372                        _ => return Err(AdapterError::ReadOnlyTransaction),
1373                    },
1374                    TransactionOps::Subscribe => {
1375                        return Err(AdapterError::SubscribeOnlyTransaction);
1376                    }
1377                    TransactionOps::Writes(txn_writes) => match add_ops {
1378                        TransactionOps::Writes(mut add_writes) => {
1379                            // We should have already checked the access above, but make sure we don't miss
1380                            // it anyway.
1381                            assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1382                            txn_writes.append(&mut add_writes);
1383                        }
1384                        // Iff peeks do not have a timestamp (i.e. they are
1385                        // constant), we can permit them.
1386                        TransactionOps::Peeks { determination, .. }
1387                            if !determination.timestamp_context.contains_timestamp() => {}
1388                        _ => {
1389                            return Err(AdapterError::WriteOnlyTransaction);
1390                        }
1391                    },
1392                    TransactionOps::SingleStatement { .. } => {
1393                        return Err(AdapterError::SingleStatementTransaction);
1394                    }
1395                    TransactionOps::DDL {
1396                        ops: og_ops,
1397                        revision: og_revision,
1398                        state: og_state,
1399                        side_effects,
1400                    } => match add_ops {
1401                        TransactionOps::DDL {
1402                            ops: new_ops,
1403                            revision: new_revision,
1404                            side_effects: mut net_new_side_effects,
1405                            state: new_state,
1406                        } => {
1407                            if *og_revision != new_revision {
1408                                return Err(AdapterError::DDLTransactionRace);
1409                            }
1410                            // The old og_ops are overwritten, not extended.
1411                            if !new_ops.is_empty() {
1412                                *og_ops = new_ops;
1413                                *og_state = new_state;
1414                            }
1415                            side_effects.append(&mut net_new_side_effects);
1416                        }
1417                        _ => return Err(AdapterError::DDLOnlyTransaction),
1418                    },
1419                }
1420            }
1421            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1422                unreachable!()
1423            }
1424        }
1425        Ok(())
1426    }
1427}
1428
1429/// An abstraction allowing us to identify different transactions.
1430pub type TransactionId = u64;
1431
1432impl<T> Default for TransactionStatus<T> {
1433    fn default() -> Self {
1434        TransactionStatus::Default
1435    }
1436}
1437
1438/// State data for transactions.
1439#[derive(Debug)]
1440pub struct Transaction<T> {
1441    /// Plan context.
1442    pub pcx: PlanContext,
1443    /// Transaction operations.
1444    pub ops: TransactionOps<T>,
1445    /// Uniquely identifies the transaction on a per connection basis.
1446    /// Two transactions started from separate connections may share the
1447    /// same ID.
1448    /// If all IDs have been exhausted, this will wrap around back to 0.
1449    pub id: TransactionId,
1450    /// Locks for objects this transaction will operate on.
1451    write_lock_guards: Option<WriteLocks>,
1452    /// Access mode (read only, read write).
1453    access: Option<TransactionAccessMode>,
1454}
1455
1456impl<T> Transaction<T> {
1457    /// Tries to grant the write lock to this transaction for the remainder of its lifetime. Errors
1458    /// if this [`Transaction`] has already been granted write locks.
1459    fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1460        match &mut self.write_lock_guards {
1461            Some(existing) => Err(existing),
1462            locks @ None => {
1463                *locks = Some(guards);
1464                Ok(())
1465            }
1466        }
1467    }
1468
1469    /// The timeline of the transaction, if one exists.
1470    fn timeline(&self) -> Option<Timeline> {
1471        match &self.ops {
1472            TransactionOps::Peeks {
1473                determination:
1474                    TimestampDetermination {
1475                        timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1476                        ..
1477                    },
1478                ..
1479            } => Some(timeline.clone()),
1480            TransactionOps::Peeks { .. }
1481            | TransactionOps::None
1482            | TransactionOps::Subscribe
1483            | TransactionOps::Writes(_)
1484            | TransactionOps::SingleStatement { .. }
1485            | TransactionOps::DDL { .. } => None,
1486        }
1487    }
1488
1489    /// The cluster of the transaction, if one exists.
1490    pub fn cluster(&self) -> Option<ClusterId> {
1491        match &self.ops {
1492            TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1493            TransactionOps::None
1494            | TransactionOps::Subscribe
1495            | TransactionOps::Writes(_)
1496            | TransactionOps::SingleStatement { .. }
1497            | TransactionOps::DDL { .. } => None,
1498        }
1499    }
1500
1501    /// Reports whether any operations have been executed as part of this transaction
1502    fn contains_ops(&self) -> bool {
1503        !matches!(self.ops, TransactionOps::None)
1504    }
1505}
1506
1507/// A transaction's status code.
1508#[derive(Debug, Clone, Copy)]
1509pub enum TransactionCode {
1510    /// Not currently in a transaction
1511    Idle,
1512    /// Currently in a transaction
1513    InTransaction,
1514    /// Currently in a transaction block which is failed
1515    Failed,
1516}
1517
1518impl From<TransactionCode> for u8 {
1519    fn from(code: TransactionCode) -> Self {
1520        match code {
1521            TransactionCode::Idle => b'I',
1522            TransactionCode::InTransaction => b'T',
1523            TransactionCode::Failed => b'E',
1524        }
1525    }
1526}
1527
1528impl From<TransactionCode> for String {
1529    fn from(code: TransactionCode) -> Self {
1530        char::from(u8::from(code)).to_string()
1531    }
1532}
1533
1534impl<T> From<&TransactionStatus<T>> for TransactionCode {
1535    /// Convert from the Session's version
1536    fn from(status: &TransactionStatus<T>) -> TransactionCode {
1537        match status {
1538            TransactionStatus::Default => TransactionCode::Idle,
1539            TransactionStatus::Started(_) => TransactionCode::InTransaction,
1540            TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1541            TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1542            TransactionStatus::Failed(_) => TransactionCode::Failed,
1543        }
1544    }
1545}
1546
1547/// The type of operation being performed by the transaction.
1548///
1549/// This is needed because we currently do not allow mixing reads and writes in
1550/// a transaction. Use this to record what we have done, and what may need to
1551/// happen at commit.
1552#[derive(Derivative)]
1553#[derivative(Debug)]
1554pub enum TransactionOps<T> {
1555    /// The transaction has been initiated, but no statement has yet been executed
1556    /// in it.
1557    None,
1558    /// This transaction has had a peek (`SELECT`, `SUBSCRIBE`). If the inner value
1559    /// is has a timestamp, it must only do other peeks. However, if it doesn't
1560    /// have a timestamp (i.e. the values are constants), the transaction can still
1561    /// perform writes.
1562    Peeks {
1563        /// The timestamp and timestamp related metadata for the peek.
1564        determination: TimestampDetermination<T>,
1565        /// The cluster used to execute peeks.
1566        cluster_id: ClusterId,
1567        /// Whether this peek needs to be linearized.
1568        requires_linearization: RequireLinearization,
1569    },
1570    /// This transaction has done a `SUBSCRIBE` and must do nothing else.
1571    Subscribe,
1572    /// This transaction has had a write (`INSERT`, `UPDATE`, `DELETE`) and must
1573    /// only do other writes, or reads whose timestamp is None (i.e. constants).
1574    Writes(Vec<WriteOp>),
1575    /// This transaction has a prospective statement that will execute during commit.
1576    SingleStatement {
1577        /// The prospective statement.
1578        stmt: Arc<Statement<Raw>>,
1579        /// The statement params.
1580        params: mz_sql::plan::Params,
1581    },
1582    /// This transaction has run some _simple_ DDL and must do nothing else. Any statement/plan that
1583    /// uses this must return false in `must_serialize_ddl()` because this is serialized instead in
1584    /// `sequence_plan()` during `COMMIT`.
1585    DDL {
1586        /// Catalog operations that have already run, and must run before each subsequent op.
1587        ops: Vec<crate::catalog::Op>,
1588        /// In-memory state that reflects the previously applied ops.
1589        state: CatalogState,
1590        /// A list of side effects that should be executed if this DDL transaction commits.
1591        #[derivative(Debug = "ignore")]
1592        side_effects: Vec<
1593            Box<
1594                dyn for<'a> FnOnce(
1595                        &'a mut Coordinator,
1596                        Option<&'a mut ExecuteContext>,
1597                    ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1598                    + Send
1599                    + Sync,
1600            >,
1601        >,
1602        /// Transient revision of the `Catalog` when this transaction started.
1603        revision: u64,
1604    },
1605}
1606
1607impl<T> TransactionOps<T> {
1608    fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1609        match self {
1610            TransactionOps::Peeks { determination, .. } => Some(determination),
1611            TransactionOps::None
1612            | TransactionOps::Subscribe
1613            | TransactionOps::Writes(_)
1614            | TransactionOps::SingleStatement { .. }
1615            | TransactionOps::DDL { .. } => None,
1616        }
1617    }
1618}
1619
1620impl<T> Default for TransactionOps<T> {
1621    fn default() -> Self {
1622        Self::None
1623    }
1624}
1625
1626/// An `INSERT` waiting to be committed.
1627#[derive(Debug, Clone, PartialEq)]
1628pub struct WriteOp {
1629    /// The target table.
1630    pub id: CatalogItemId,
1631    /// The data rows.
1632    pub rows: TableData,
1633}
1634
1635/// Whether a transaction requires linearization.
1636#[derive(Debug)]
1637pub enum RequireLinearization {
1638    /// Linearization is required.
1639    Required,
1640    /// Linearization is not required.
1641    NotRequired,
1642}
1643
1644impl From<&ExplainContext> for RequireLinearization {
1645    fn from(ctx: &ExplainContext) -> Self {
1646        match ctx {
1647            ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1648                RequireLinearization::Required
1649            }
1650            _ => RequireLinearization::NotRequired,
1651        }
1652    }
1653}
1654
1655/// A complete set of exclusive locks for writing to collections identified by [`CatalogItemId`]s.
1656///
1657/// To prevent deadlocks between two sessions, we do not allow acquiring a partial set of locks.
1658#[derive(Debug)]
1659pub struct WriteLocks {
1660    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1661    /// Connection that currently holds these locks, used for tracing purposes only.
1662    conn_id: ConnectionId,
1663}
1664
1665impl WriteLocks {
1666    /// Create a [`WriteLocksBuilder`] pre-defining all of the locks we need.
1667    ///
1668    /// When "finishing" the builder with [`WriteLocksBuilder::all_or_nothing`], if we haven't
1669    /// acquired all of the necessary locks we drop any partially acquired ones.
1670    pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1671        let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1672        WriteLocksBuilder { locks }
1673    }
1674
1675    /// Validate this set of [`WriteLocks`] is sufficient for the provided collections.
1676    /// Dropping the currently held locks if it's not.
1677    pub fn validate(
1678        self,
1679        collections: impl Iterator<Item = CatalogItemId>,
1680    ) -> Result<Self, BTreeSet<CatalogItemId>> {
1681        let mut missing = BTreeSet::new();
1682        for collection in collections {
1683            if !self.locks.contains_key(&collection) {
1684                missing.insert(collection);
1685            }
1686        }
1687
1688        if missing.is_empty() {
1689            Ok(self)
1690        } else {
1691            // Explicitly drop the already acquired locks.
1692            drop(self);
1693            Err(missing)
1694        }
1695    }
1696}
1697
1698impl Drop for WriteLocks {
1699    fn drop(&mut self) {
1700        // We may have merged the locks into GroupCommitWriteLocks, thus it could be empty.
1701        if !self.locks.is_empty() {
1702            tracing::info!(
1703                conn_id = %self.conn_id,
1704                locks = ?self.locks,
1705                "dropping write locks",
1706            );
1707        }
1708    }
1709}
1710
1711/// A builder struct that helps us acquire all of the locks we need, or none of them.
1712///
1713/// See [`WriteLocks::builder`].
1714#[derive(Debug)]
1715pub struct WriteLocksBuilder {
1716    locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1717}
1718
1719impl WriteLocksBuilder {
1720    /// Adds a lock to this builder.
1721    pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1722        self.locks.insert(id, Some(lock));
1723    }
1724
1725    /// Finish this builder by returning either all of the necessary locks, or none of them.
1726    ///
1727    /// If we fail to acquire all of the locks, returns one of the [`CatalogItemId`]s that we
1728    /// failed to acquire a lock for, that should be awaited so we know when to run again.
1729    pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1730        let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1731            self.locks
1732                .into_iter()
1733                .partition_map(|(gid, lock)| match lock {
1734                    Some(lock) => itertools::Either::Left((gid, lock)),
1735                    None => itertools::Either::Right(gid),
1736                });
1737
1738        match missing.iter().next() {
1739            None => {
1740                tracing::info!(%conn_id, ?locks, "acquired write locks");
1741                Ok(WriteLocks {
1742                    locks,
1743                    conn_id: conn_id.clone(),
1744                })
1745            }
1746            Some(gid) => {
1747                tracing::info!(?missing, "failed to acquire write locks");
1748                // Explicitly drop the already acquired locks.
1749                drop(locks);
1750                Err(*gid)
1751            }
1752        }
1753    }
1754}
1755
1756/// Collection of [`WriteLocks`] gathered during [`group_commit`].
1757///
1758/// Note: This struct should __never__ be used outside of group commit because it attempts to merge
1759/// together several collections of [`WriteLocks`] which if not done carefully can cause deadlocks
1760/// or consistency violations.
1761///
1762/// We must prevent writes from occurring to tables during read then write plans (e.g. `UPDATE`)
1763/// but we can allow blind writes (e.g. `INSERT`) to get committed concurrently at the same
1764/// timestamp when submitting the updates from a read then write plan.
1765///
1766/// Naively it would seem as though we could allow blind writes to occur whenever as blind writes
1767/// could never cause invalid retractions, but it could cause us to violate serializability because
1768/// there is no total order we could define for the transactions. Consider the following scenario:
1769///
1770/// ```text
1771/// table: foo
1772///
1773///  a | b
1774/// --------
1775///  x   2
1776///  y   3
1777///  z   4
1778///
1779/// -- Session(A)
1780/// -- read then write plan, reads at t0, writes at t3, transaction Ta
1781/// DELETE FROM foo WHERE b % 2 = 0;
1782///
1783///
1784/// -- Session(B)
1785/// -- blind write into foo, writes at t1, transaction Tb
1786/// INSERT INTO foo VALUES ('q', 6);
1787/// -- select from foo, reads at t2, transaction Tc
1788/// SELECT * FROM foo;
1789///
1790///
1791/// The times these operations occur at are ordered:
1792/// t0 < t1 < t2 < t3
1793///
1794/// Given the timing of the operations, the transactions must have the following order:
1795///
1796/// * Ta does not observe ('q', 6), so Ta < Tb
1797/// * Tc does observe ('q', 6), so Tb < Tc
1798/// * Tc does not observe the retractions from Ta, so Tc < Ta
1799///
1800/// For total order to exist, Ta < Tb < Tc < Ta, which is impossible.
1801/// ```
1802///
1803/// [`group_commit`]: super::coord::Coordinator::group_commit
1804#[derive(Debug, Default)]
1805pub(crate) struct GroupCommitWriteLocks {
1806    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1807}
1808
1809impl GroupCommitWriteLocks {
1810    /// Merge a set of [`WriteLocks`] into this collection for group commit.
1811    pub fn merge(&mut self, mut locks: WriteLocks) {
1812        // Note: Ideally we would use `.drain`, but that method doesn't exist for BTreeMap.
1813        //
1814        // See: <https://github.com/rust-lang/rust/issues/81074>
1815        let existing = std::mem::take(&mut locks.locks);
1816        self.locks.extend(existing);
1817    }
1818
1819    /// Returns the collections we're missing locks for, if any.
1820    pub fn missing_locks(
1821        &self,
1822        writes: impl Iterator<Item = CatalogItemId>,
1823    ) -> BTreeSet<CatalogItemId> {
1824        let mut missing = BTreeSet::new();
1825        for write in writes {
1826            if !self.locks.contains_key(&write) {
1827                missing.insert(write);
1828            }
1829        }
1830        missing
1831    }
1832}
1833
1834impl Drop for GroupCommitWriteLocks {
1835    fn drop(&mut self) {
1836        if !self.locks.is_empty() {
1837            tracing::info!(
1838                locks = ?self.locks,
1839                "dropping group commit write locks",
1840            );
1841        }
1842    }
1843}