Skip to main content

mz_adapter/
session.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Per-connection configuration parameters and state.
11
12#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use derivative::Derivative;
25use itertools::Itertools;
26use mz_adapter_types::connection::ConnectionId;
27use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
28use mz_controller_types::ClusterId;
29use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_pgwire_common::Format;
32use mz_repr::role_id::RoleId;
33use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
34use mz_repr::{CatalogItemId, Datum, Row, RowIterator, SqlScalarType, TimestampManipulation};
35use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
36use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::user::{
39    INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
40};
41use mz_sql::session::vars::IsolationLevel;
42pub use mz_sql::session::vars::{
43    DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
44    SERVER_PATCH_VERSION, SessionVars, Var,
45};
46use mz_sql_parser::ast::TransactionIsolationLevel;
47use mz_storage_client::client::TableData;
48use mz_storage_types::sources::Timeline;
49use qcell::{QCell, QCellOwner};
50use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
51use tokio::sync::watch;
52use uuid::Uuid;
53
54use crate::catalog::CatalogState;
55use crate::client::RecordFirstRowStream;
56use crate::coord::appends::BuiltinTableAppendNotify;
57use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
58use crate::coord::peek::PeekResponseUnary;
59use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
60use crate::coord::{Coordinator, ExplainContext};
61use crate::error::AdapterError;
62use crate::metrics::{Metrics, SessionMetrics};
63use crate::statement_logging::PreparedStatementLoggingInfo;
64use crate::{AdapterNotice, ExecuteContext};
65use mz_catalog::durable::Snapshot;
66
67const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
68
69/// A session holds per-connection state.
70#[derive(Derivative)]
71#[derivative(Debug)]
72pub struct Session<T = mz_repr::Timestamp>
73where
74    T: Debug + Clone + Send + Sync,
75{
76    conn_id: ConnectionId,
77    /// A globally unique identifier for the session. Not to be confused
78    /// with `conn_id`, which may be reused.
79    uuid: Uuid,
80    prepared_statements: BTreeMap<String, PreparedStatement>,
81    portals: BTreeMap<String, Portal>,
82    transaction: TransactionStatus<T>,
83    pcx: Option<PlanContext>,
84    metrics: SessionMetrics,
85    #[derivative(Debug = "ignore")]
86    builtin_updates: Option<BuiltinTableAppendNotify>,
87
88    /// The role metadata of the current session.
89    ///
90    /// Invariant: role_metadata must be `Some` after the user has
91    /// successfully connected to and authenticated with Materialize.
92    ///
93    /// Prefer using this value over [`SessionConfig::user`].
94    //
95    // It would be better for this not to be an Option, but the
96    // `Session` is initialized before the user has connected to
97    // Materialize and is able to look up the `RoleMetadata`. The `Session`
98    // is also used to return an error when no role exists and
99    // therefore there is no valid `RoleMetadata`.
100    role_metadata: Option<RoleMetadata>,
101    client_ip: Option<IpAddr>,
102    vars: SessionVars,
103    notices_tx: mpsc::UnboundedSender<AdapterNotice>,
104    notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
105    next_transaction_id: TransactionId,
106    secret_key: u32,
107    external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
108    // Token allowing us to access `Arc<QCell<StatementLogging>>`
109    // metadata. We want these to be reference-counted, because the same
110    // statement might be referenced from multiple portals simultaneously.
111    //
112    // However, they can't be `Rc<RefCell<StatementLogging>>`, because
113    // the `Session` is sent around to different threads.
114    //
115    // On the other hand, they don't need to be
116    // `Arc<Mutex<StatementLogging>>`, because they will always be
117    // accessed from the same thread that the `Session` is currently
118    // on. We express this by gating access with this token.
119    #[derivative(Debug = "ignore")]
120    qcell_owner: QCellOwner,
121    session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
122    /// Incremented when session state that is relevant to prepared statement planning changes.
123    /// Currently, only changes to `portals` are tracked. Changes to `prepared_statements` don't
124    /// need to be tracked, because prepared statements can't depend on other prepared statements.
125    /// TODO: We might want to track changes also to session variables.
126    /// (`Catalog::transient_revision` similarly tracks changes on the catalog side.)
127    state_revision: u64,
128}
129
130impl<T> SessionMetadata for Session<T>
131where
132    T: Debug + Clone + Send + Sync,
133    T: TimestampManipulation,
134{
135    fn conn_id(&self) -> &ConnectionId {
136        &self.conn_id
137    }
138
139    fn client_ip(&self) -> Option<&IpAddr> {
140        self.client_ip.as_ref()
141    }
142
143    fn pcx(&self) -> &PlanContext {
144        &self
145            .transaction()
146            .inner()
147            .expect("no active transaction")
148            .pcx
149    }
150
151    fn role_metadata(&self) -> &RoleMetadata {
152        self.role_metadata
153            .as_ref()
154            .expect("role_metadata invariant violated")
155    }
156
157    fn vars(&self) -> &SessionVars {
158        &self.vars
159    }
160}
161
162/// Data structure suitable for passing to other threads that need access to some common Session
163/// properties.
164#[derive(Debug)]
165pub struct SessionMeta {
166    conn_id: ConnectionId,
167    client_ip: Option<IpAddr>,
168    pcx: PlanContext,
169    role_metadata: RoleMetadata,
170    vars: SessionVars,
171}
172
173impl SessionMetadata for SessionMeta {
174    fn vars(&self) -> &SessionVars {
175        &self.vars
176    }
177
178    fn conn_id(&self) -> &ConnectionId {
179        &self.conn_id
180    }
181
182    fn client_ip(&self) -> Option<&IpAddr> {
183        self.client_ip.as_ref()
184    }
185
186    fn pcx(&self) -> &PlanContext {
187        &self.pcx
188    }
189
190    fn role_metadata(&self) -> &RoleMetadata {
191        &self.role_metadata
192    }
193}
194
195/// Configures a new [`Session`].
196#[derive(Debug, Clone)]
197pub struct SessionConfig {
198    /// The connection ID for the session.
199    ///
200    /// May be reused after the session terminates.
201    pub conn_id: ConnectionId,
202    /// A universally unique identifier for the session, across all processes,
203    /// region, and all time.
204    ///
205    /// Must not be reused, even after the session terminates.
206    pub uuid: Uuid,
207    /// The peer address of the client
208    pub client_ip: Option<IpAddr>,
209    /// The name of the user associated with the session.
210    pub user: String,
211    /// An optional receiver that the session will periodically check for
212    /// updates to a user's external metadata.
213    pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
214    /// Helm chart version
215    pub helm_chart_version: Option<String>,
216}
217
218impl<T: TimestampManipulation> Session<T> {
219    /// Creates a new session for the specified connection ID.
220    pub(crate) fn new(
221        build_info: &'static BuildInfo,
222        config: SessionConfig,
223        metrics: SessionMetrics,
224    ) -> Session<T> {
225        assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
226        Self::new_internal(build_info, config, metrics)
227    }
228
229    /// Returns a reference-less collection of data usable by other tasks that don't have ownership
230    /// of the Session.
231    pub fn meta(&self) -> SessionMeta {
232        SessionMeta {
233            conn_id: self.conn_id().clone(),
234            client_ip: self.client_ip().copied(),
235            pcx: self.pcx().clone(),
236            role_metadata: self.role_metadata().clone(),
237            vars: self.vars.clone(),
238        }
239
240        // TODO: soft_assert that these are the same as Session.
241    }
242
243    /// Creates new statement logging metadata for a one-off
244    /// statement.
245    // Normally, such logging information would be created as part of
246    // allocating a new prepared statement, and a refcounted handle
247    // would be copied from that prepared statement to portals during
248    // binding. However, we also support (via `Command::declare`)
249    // binding a statement directly to a portal without creating an
250    // intermediate prepared statement. Thus, for those cases, a
251    // mechanism for generating the logging metadata directly is needed.
252    pub(crate) fn mint_logging<A: AstInfo>(
253        &self,
254        raw_sql: String,
255        stmt: Option<&Statement<A>>,
256        now: EpochMillis,
257    ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
258        Arc::new(QCell::new(
259            &self.qcell_owner,
260            PreparedStatementLoggingInfo::still_to_log(
261                raw_sql,
262                stmt,
263                now,
264                "".to_string(),
265                self.uuid,
266                false,
267            ),
268        ))
269    }
270
271    pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
272        self.qcell_owner.ro(&*cell)
273    }
274
275    pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
276        self.qcell_owner.rw(&*cell)
277    }
278
279    /// Returns a unique ID for the session.
280    /// Not to be confused with `connection_id`, which can be reused.
281    pub fn uuid(&self) -> Uuid {
282        self.uuid
283    }
284
285    /// Creates a new dummy session.
286    ///
287    /// Dummy sessions are intended for use when executing queries on behalf of
288    /// the system itself, rather than on behalf of a user.
289    pub fn dummy() -> Session<T> {
290        let registry = MetricsRegistry::new();
291        let metrics = Metrics::register_into(&registry);
292        let metrics = metrics.session_metrics();
293        let mut dummy = Self::new_internal(
294            &DUMMY_BUILD_INFO,
295            SessionConfig {
296                conn_id: DUMMY_CONNECTION_ID,
297                uuid: Uuid::new_v4(),
298                user: SYSTEM_USER.name.clone(),
299                client_ip: None,
300                external_metadata_rx: None,
301                helm_chart_version: None,
302            },
303            metrics,
304        );
305        dummy.initialize_role_metadata(RoleId::User(0));
306        dummy
307    }
308
309    fn new_internal(
310        build_info: &'static BuildInfo,
311        SessionConfig {
312            conn_id,
313            uuid,
314            user,
315            client_ip,
316            mut external_metadata_rx,
317            helm_chart_version,
318        }: SessionConfig,
319        metrics: SessionMetrics,
320    ) -> Session<T> {
321        let (notices_tx, notices_rx) = mpsc::unbounded_channel();
322        let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
323        let user = User {
324            name: user,
325            internal_metadata: None,
326            external_metadata: external_metadata_rx
327                .as_mut()
328                .map(|rx| rx.borrow_and_update().clone()),
329        };
330        let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
331        if let Some(default_cluster) = default_cluster {
332            vars.set_cluster(default_cluster.clone());
333        }
334        Session {
335            conn_id,
336            uuid,
337            transaction: TransactionStatus::Default,
338            pcx: None,
339            metrics,
340            builtin_updates: None,
341            prepared_statements: BTreeMap::new(),
342            portals: BTreeMap::new(),
343            role_metadata: None,
344            client_ip,
345            vars,
346            notices_tx,
347            notices_rx,
348            next_transaction_id: 0,
349            secret_key: rand::random(),
350            external_metadata_rx,
351            qcell_owner: QCellOwner::new(),
352            session_oracles: BTreeMap::new(),
353            state_revision: 0,
354        }
355    }
356
357    /// Returns the secret key associated with the session.
358    pub fn secret_key(&self) -> u32 {
359        self.secret_key
360    }
361
362    fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
363        if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
364            wall_time = *mock_time;
365        }
366        PlanContext::new(wall_time)
367    }
368
369    /// Starts an explicit transaction, or changes an implicit to an explicit
370    /// transaction.
371    pub fn start_transaction(
372        &mut self,
373        wall_time: DateTime<Utc>,
374        access: Option<TransactionAccessMode>,
375        isolation_level: Option<TransactionIsolationLevel>,
376    ) -> Result<(), AdapterError> {
377        // Check that current transaction state is compatible with new `access`
378        if let Some(txn) = self.transaction.inner() {
379            // `READ WRITE` prohibited if:
380            // - Currently in `READ ONLY`
381            // - Already performed a query
382            let read_write_prohibited = match txn.ops {
383                TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
384                    txn.access == Some(TransactionAccessMode::ReadOnly)
385                }
386                TransactionOps::None
387                | TransactionOps::Writes(_)
388                | TransactionOps::SingleStatement { .. }
389                | TransactionOps::DDL { .. } => false,
390            };
391
392            if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
393                return Err(AdapterError::ReadWriteUnavailable);
394            }
395        }
396
397        match std::mem::take(&mut self.transaction) {
398            TransactionStatus::Default => {
399                let id = self.next_transaction_id;
400                self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
401                self.transaction = TransactionStatus::InTransaction(Transaction {
402                    pcx: self.new_pcx(wall_time),
403                    ops: TransactionOps::None,
404                    write_lock_guards: None,
405                    access,
406                    id,
407                });
408            }
409            TransactionStatus::Started(mut txn)
410            | TransactionStatus::InTransactionImplicit(mut txn)
411            | TransactionStatus::InTransaction(mut txn) => {
412                if access.is_some() {
413                    txn.access = access;
414                }
415                self.transaction = TransactionStatus::InTransaction(txn);
416            }
417            TransactionStatus::Failed(_) => unreachable!(),
418        };
419
420        if let Some(isolation_level) = isolation_level {
421            self.vars
422                .set_local_transaction_isolation(isolation_level.into());
423        }
424
425        Ok(())
426    }
427
428    /// Starts either a single statement or implicit transaction based on the
429    /// number of statements, but only if no transaction has been started already.
430    pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
431        if let TransactionStatus::Default = self.transaction {
432            let id = self.next_transaction_id;
433            self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
434            let txn = Transaction {
435                pcx: self.new_pcx(wall_time),
436                ops: TransactionOps::None,
437                write_lock_guards: None,
438                access: None,
439                id,
440            };
441            match stmts {
442                1 => self.transaction = TransactionStatus::Started(txn),
443                n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
444                _ => {}
445            }
446        }
447    }
448
449    /// Starts a single statement transaction, but only if no transaction has been started already.
450    pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
451        self.start_transaction_implicit(wall_time, 1);
452    }
453
454    /// Clears a transaction, setting its state to Default and destroying all
455    /// portals. Returned are:
456    /// - sinks that were started in this transaction and need to be dropped
457    /// - the cleared transaction so its operations can be handled
458    ///
459    /// The [Postgres protocol docs](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) specify:
460    /// > a named portal object lasts till the end of the current transaction
461    /// and
462    /// > An unnamed portal is destroyed at the end of the transaction
463    #[must_use]
464    pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
465        self.portals.clear();
466        self.pcx = None;
467        self.state_revision += 1;
468        mem::take(&mut self.transaction)
469    }
470
471    /// Marks the current transaction as failed.
472    pub fn fail_transaction(mut self) -> Self {
473        match self.transaction {
474            TransactionStatus::Default => unreachable!(),
475            TransactionStatus::Started(txn)
476            | TransactionStatus::InTransactionImplicit(txn)
477            | TransactionStatus::InTransaction(txn) => {
478                self.transaction = TransactionStatus::Failed(txn);
479            }
480            TransactionStatus::Failed(_) => {}
481        };
482        self
483    }
484
485    /// Returns the current transaction status.
486    pub fn transaction(&self) -> &TransactionStatus<T> {
487        &self.transaction
488    }
489
490    /// Returns the current transaction status.
491    pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
492        &mut self.transaction
493    }
494
495    /// Returns the session's transaction code.
496    pub fn transaction_code(&self) -> TransactionCode {
497        self.transaction().into()
498    }
499
500    /// Adds operations to the current transaction. An error is produced if
501    /// they cannot be merged (i.e., a timestamp-dependent read cannot be
502    /// merged to an insert).
503    pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
504        self.transaction.add_ops(add_ops)
505    }
506
507    /// Returns a channel on which to send notices to the session.
508    pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
509        self.notices_tx.clone()
510    }
511
512    /// Adds a notice to the session.
513    pub fn add_notice(&self, notice: AdapterNotice) {
514        self.add_notices([notice])
515    }
516
517    /// Adds multiple notices to the session.
518    pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
519        for notice in notices {
520            let _ = self.notices_tx.send(notice);
521        }
522    }
523
524    /// Awaits a possible notice.
525    ///
526    /// This method is cancel safe.
527    pub async fn recv_notice(&mut self) -> AdapterNotice {
528        // This method is cancel safe because recv is cancel safe.
529        loop {
530            let notice = self
531                .notices_rx
532                .recv()
533                .await
534                .expect("Session also holds a sender, so recv won't ever return None");
535            match self.notice_filter(notice) {
536                Some(notice) => return notice,
537                None => continue,
538            }
539        }
540    }
541
542    /// Returns a draining iterator over the notices attached to the session.
543    pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
544        let mut notices = Vec::new();
545        while let Ok(notice) = self.notices_rx.try_recv() {
546            if let Some(notice) = self.notice_filter(notice) {
547                notices.push(notice);
548            }
549        }
550        notices
551    }
552
553    /// Returns Some if the notice should be reported, otherwise None.
554    fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
555        // Filter out low threshold severity.
556        let minimum_client_severity = self.vars.client_min_messages();
557        let sev = notice.severity();
558        if !minimum_client_severity.should_output_to_client(&sev) {
559            return None;
560        }
561        // Filter out notices for other clusters.
562        if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = &notice {
563            if cluster != self.vars.cluster() {
564                return None;
565            }
566        }
567        Some(notice)
568    }
569
570    /// Sets the transaction ops to `TransactionOps::None`. Must only be used after
571    /// verifying that no transaction anomalies will occur if cleared.
572    pub fn clear_transaction_ops(&mut self) {
573        if let Some(txn) = self.transaction.inner_mut() {
574            txn.ops = TransactionOps::None;
575        }
576    }
577
578    /// If the current transaction ops belong to a read, then sets the
579    /// ops to `None`, returning the old read timestamp context if
580    /// any existed. Must only be used after verifying that no transaction
581    /// anomalies will occur if cleared.
582    pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
583        if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
584            if let TransactionOps::Peeks { .. } = ops {
585                let ops = std::mem::take(ops);
586                Some(
587                    ops.timestamp_determination()
588                        .expect("checked above")
589                        .timestamp_context,
590                )
591            } else {
592                None
593            }
594        } else {
595            None
596        }
597    }
598
599    /// Returns the transaction's read timestamp determination, if set.
600    ///
601    /// Returns `None` if there is no active transaction, or if the active
602    /// transaction is not a read transaction.
603    pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
604        match self.transaction.inner() {
605            Some(Transaction {
606                pcx: _,
607                ops: TransactionOps::Peeks { determination, .. },
608                write_lock_guards: _,
609                access: _,
610                id: _,
611            }) => Some(determination.clone()),
612            _ => None,
613        }
614    }
615
616    /// Whether this session has a timestamp for a read transaction.
617    pub fn contains_read_timestamp(&self) -> bool {
618        matches!(
619            self.transaction.inner(),
620            Some(Transaction {
621                pcx: _,
622                ops: TransactionOps::Peeks {
623                    determination: TimestampDetermination {
624                        timestamp_context: TimestampContext::TimelineTimestamp { .. },
625                        ..
626                    },
627                    ..
628                },
629                write_lock_guards: _,
630                access: _,
631                id: _,
632            })
633        )
634    }
635
636    /// Registers the prepared statement under `name`.
637    pub fn set_prepared_statement(
638        &mut self,
639        name: String,
640        stmt: Option<Statement<Raw>>,
641        raw_sql: String,
642        desc: StatementDesc,
643        state_revision: StateRevision,
644        now: EpochMillis,
645    ) {
646        let logging = PreparedStatementLoggingInfo::still_to_log(
647            raw_sql,
648            stmt.as_ref(),
649            now,
650            name.clone(),
651            self.uuid,
652            false,
653        );
654        let statement = PreparedStatement {
655            stmt,
656            desc,
657            state_revision,
658            logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
659        };
660        self.prepared_statements.insert(name, statement);
661    }
662
663    /// Removes the prepared statement associated with `name`.
664    ///
665    /// Returns whether a statement previously existed.
666    pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
667        self.prepared_statements.remove(name).is_some()
668    }
669
670    /// Removes all prepared statements.
671    pub fn remove_all_prepared_statements(&mut self) {
672        self.prepared_statements.clear();
673    }
674
675    /// Retrieves the prepared statement associated with `name`.
676    ///
677    /// This is unverified and could be incorrect if the underlying catalog has
678    /// changed.
679    pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
680        self.prepared_statements.get(name)
681    }
682
683    /// Retrieves the prepared statement associated with `name`.
684    ///
685    /// This is unverified and could be incorrect if the underlying catalog has
686    /// changed.
687    pub fn get_prepared_statement_mut_unverified(
688        &mut self,
689        name: &str,
690    ) -> Option<&mut PreparedStatement> {
691        self.prepared_statements.get_mut(name)
692    }
693
694    /// Returns the prepared statements for the session.
695    pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
696        &self.prepared_statements
697    }
698
699    /// Returns the portals for the session.
700    pub fn portals(&self) -> &BTreeMap<String, Portal> {
701        &self.portals
702    }
703
704    /// Binds the specified portal to the specified prepared statement.
705    ///
706    /// If the prepared statement contains parameters, the values and types of
707    /// those parameters must be provided in `params`. It is the caller's
708    /// responsibility to ensure that the correct number of parameters is
709    /// provided.
710    ///
711    /// The `results_formats` parameter sets the desired format of the results,
712    /// and is stored on the portal.
713    pub fn set_portal(
714        &mut self,
715        portal_name: String,
716        desc: StatementDesc,
717        stmt: Option<Statement<Raw>>,
718        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
719        params: Vec<(Datum, SqlScalarType)>,
720        result_formats: Vec<Format>,
721        state_revision: StateRevision,
722    ) -> Result<(), AdapterError> {
723        // The empty portal can be silently replaced.
724        if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
725            return Err(AdapterError::DuplicateCursor(portal_name));
726        }
727        self.state_revision += 1;
728        let param_types = desc.param_types.clone();
729        self.portals.insert(
730            portal_name,
731            Portal {
732                stmt: stmt.map(Arc::new),
733                desc,
734                state_revision,
735                parameters: Params {
736                    datums: Row::pack(params.iter().map(|(d, _t)| d)),
737                    execute_types: params.into_iter().map(|(_d, t)| t).collect(),
738                    expected_types: param_types,
739                },
740                result_formats,
741                state: PortalState::NotStarted,
742                logging,
743                lifecycle_timestamps: None,
744            },
745        );
746        Ok(())
747    }
748
749    /// Removes the specified portal.
750    ///
751    /// If there is no such portal, this method does nothing. Returns whether that portal existed.
752    pub fn remove_portal(&mut self, portal_name: &str) -> bool {
753        self.state_revision += 1;
754        self.portals.remove(portal_name).is_some()
755    }
756
757    /// Retrieves a reference to the specified portal.
758    ///
759    /// If there is no such portal, returns `None`.
760    pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
761        self.portals.get(portal_name)
762    }
763
764    /// Retrieves a mutable reference to the specified portal.
765    ///
766    /// If there is no such portal, returns `None`.
767    ///
768    /// Note: When using the returned `PortalRefMut`, there is no need to increment
769    /// `Session::state_revision`, because the portal's meaning is not changed.
770    pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
771        self.portals.get_mut(portal_name).map(|p| PortalRefMut {
772            stmt: &p.stmt,
773            desc: &p.desc,
774            state_revision: &mut p.state_revision,
775            parameters: &mut p.parameters,
776            result_formats: &mut p.result_formats,
777            logging: &mut p.logging,
778            state: &mut p.state,
779            lifecycle_timestamps: &mut p.lifecycle_timestamps,
780        })
781    }
782
783    /// Creates and installs a new portal.
784    pub fn create_new_portal(
785        &mut self,
786        stmt: Option<Statement<Raw>>,
787        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
788        desc: StatementDesc,
789        parameters: Params,
790        result_formats: Vec<Format>,
791        state_revision: StateRevision,
792    ) -> Result<String, AdapterError> {
793        self.state_revision += 1;
794
795        // See: https://github.com/postgres/postgres/blob/84f5c2908dad81e8622b0406beea580e40bb03ac/src/backend/utils/mmgr/portalmem.c#L234
796        for i in 0usize.. {
797            let name = format!("<unnamed portal {}>", i);
798            match self.portals.entry(name.clone()) {
799                Entry::Occupied(_) => continue,
800                Entry::Vacant(entry) => {
801                    entry.insert(Portal {
802                        stmt: stmt.map(Arc::new),
803                        desc,
804                        state_revision,
805                        parameters,
806                        result_formats,
807                        state: PortalState::NotStarted,
808                        logging,
809                        lifecycle_timestamps: None,
810                    });
811                    return Ok(name);
812                }
813            }
814        }
815
816        coord_bail!("unable to create a new portal");
817    }
818
819    /// Resets the session to its initial state. Returns sinks that need to be
820    /// dropped.
821    pub fn reset(&mut self) {
822        let _ = self.clear_transaction();
823        self.prepared_statements.clear();
824        self.vars.reset_all();
825    }
826
827    /// Returns the [application_name] that created this session.
828    ///
829    /// [application_name]: (https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-APPLICATION-NAME)
830    pub fn application_name(&self) -> &str {
831        self.vars.application_name()
832    }
833
834    /// Returns a reference to the variables in this session.
835    pub fn vars(&self) -> &SessionVars {
836        &self.vars
837    }
838
839    /// Returns a mutable reference to the variables in this session.
840    pub fn vars_mut(&mut self) -> &mut SessionVars {
841        &mut self.vars
842    }
843
844    /// Grants a set of write locks to this session's inner [`Transaction`].
845    ///
846    /// # Panics
847    /// If the inner transaction is idle. See [`TransactionStatus::try_grant_write_locks`].
848    ///
849    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
850        self.transaction.try_grant_write_locks(guards)
851    }
852
853    /// Drains any external metadata updates and applies the changes from the latest update.
854    pub fn apply_external_metadata_updates(&mut self) {
855        // If no sender is registered then there isn't anything to do.
856        let Some(rx) = &mut self.external_metadata_rx else {
857            return;
858        };
859
860        // If the value hasn't changed then return.
861        if !rx.has_changed().unwrap_or(false) {
862            return;
863        }
864
865        // Update our metadata! Note the short critical section (just a clone) to avoid blocking
866        // the sending side of this watch channel.
867        let metadata = rx.borrow_and_update().clone();
868        self.vars.set_external_user_metadata(metadata);
869    }
870
871    /// Applies the internal user metadata to the session.
872    pub fn apply_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
873        self.vars.set_internal_user_metadata(metadata);
874    }
875
876    /// Initializes the session's role metadata.
877    pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
878        self.role_metadata = Some(RoleMetadata::new(role_id));
879    }
880
881    /// Ensures that a timestamp oracle exists for `timeline` and returns a mutable reference to
882    /// the timestamp oracle.
883    pub fn ensure_timestamp_oracle(
884        &mut self,
885        timeline: Timeline,
886    ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
887        self.session_oracles
888            .entry(timeline)
889            .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
890    }
891
892    /// Ensures that a timestamp oracle exists for reads and writes from/to a local input and
893    /// returns a mutable reference to the timestamp oracle.
894    pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
895        self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
896    }
897
898    /// Returns a reference to the timestamp oracle for `timeline`.
899    pub fn get_timestamp_oracle(
900        &self,
901        timeline: &Timeline,
902    ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
903        self.session_oracles.get(timeline)
904    }
905
906    /// If the current session is using the Strong Session Serializable isolation level advance the
907    /// session local timestamp oracle to `write_ts`.
908    pub fn apply_write(&mut self, timestamp: T) {
909        if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
910            self.ensure_local_timestamp_oracle().apply_write(timestamp);
911        }
912    }
913
914    /// Returns the [`SessionMetrics`] instance associated with this [`Session`].
915    pub fn metrics(&self) -> &SessionMetrics {
916        &self.metrics
917    }
918
919    /// Sets the `BuiltinTableAppendNotify` for this session.
920    pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
921        let prev = self.builtin_updates.replace(fut);
922        mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
923    }
924
925    /// Takes the stashed `BuiltinTableAppendNotify`, if one exists, and returns a [`Future`] that
926    /// waits for the writes to complete.
927    pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
928        if let Some(fut) = self.builtin_updates.take() {
929            // Record how long we blocked for, if we blocked at all.
930            let histogram = self
931                .metrics()
932                .session_startup_table_writes_seconds()
933                .clone();
934            Some(async move {
935                fut.wall_time().observe(histogram).await;
936            })
937        } else {
938            None
939        }
940    }
941
942    /// Return the state_revision of the session, which can be used by dependent objects for knowing
943    /// when to re-plan due to session state changes.
944    pub fn state_revision(&self) -> u64 {
945        self.state_revision
946    }
947}
948
949/// A prepared statement.
950#[derive(Derivative, Clone)]
951#[derivative(Debug)]
952pub struct PreparedStatement {
953    stmt: Option<Statement<Raw>>,
954    desc: StatementDesc,
955    /// The most recent state revision that has verified this statement.
956    pub state_revision: StateRevision,
957    #[derivative(Debug = "ignore")]
958    logging: Arc<QCell<PreparedStatementLoggingInfo>>,
959}
960
961impl PreparedStatement {
962    /// Returns the AST associated with this prepared statement,
963    /// if the prepared statement was not the empty query.
964    pub fn stmt(&self) -> Option<&Statement<Raw>> {
965        self.stmt.as_ref()
966    }
967
968    /// Returns the description of the prepared statement.
969    pub fn desc(&self) -> &StatementDesc {
970        &self.desc
971    }
972
973    /// Returns a handle to the metadata for statement logging.
974    pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
975        &self.logging
976    }
977}
978
979/// A portal represents the execution state of a running or runnable query.
980#[derive(Derivative)]
981#[derivative(Debug)]
982pub struct Portal {
983    /// The statement that is bound to this portal.
984    pub stmt: Option<Arc<Statement<Raw>>>,
985    /// The statement description.
986    pub desc: StatementDesc,
987    /// The most recent state revision that has verified this portal.
988    pub state_revision: StateRevision,
989    /// The bound values for the parameters in the prepared statement, if any.
990    pub parameters: Params,
991    /// The desired output format for each column in the result set.
992    pub result_formats: Vec<Format>,
993    /// A handle to metadata needed for statement logging.
994    #[derivative(Debug = "ignore")]
995    pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
996    /// The execution state of the portal.
997    #[derivative(Debug = "ignore")]
998    pub state: PortalState,
999    /// Statement lifecycle timestamps coming from `mz-pgwire`.
1000    pub lifecycle_timestamps: Option<LifecycleTimestamps>,
1001}
1002
1003/// A mutable reference to a portal, capturing its state and associated metadata. Importantly, it
1004/// does _not_ give _mutable_ access to `stmt` and `desc`, which means that you do not need to
1005/// increment `Session::state_revision` when modifying fields through a `PortalRefMut`, because the
1006/// portal's meaning is not changed.
1007pub struct PortalRefMut<'a> {
1008    /// The statement that is bound to this portal.
1009    pub stmt: &'a Option<Arc<Statement<Raw>>>,
1010    /// The statement description.
1011    pub desc: &'a StatementDesc,
1012    /// The most recent state revision that has verified this portal.
1013    pub state_revision: &'a mut StateRevision,
1014    /// The bound values for the parameters in the prepared statement, if any.
1015    pub parameters: &'a mut Params,
1016    /// The desired output format for each column in the result set.
1017    pub result_formats: &'a mut Vec<Format>,
1018    /// A handle to metadata needed for statement logging.
1019    pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1020    /// The execution state of the portal.
1021    pub state: &'a mut PortalState,
1022    /// Statement lifecycle timestamps coming from `mz-pgwire`.
1023    pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1024}
1025
1026/// Points to a revision of catalog state and session state. When the current revisions are not the
1027/// same as the revisions when a prepared statement or a portal was described, we need to check
1028/// whether the description is still valid.
1029#[derive(Debug, Clone, Copy, PartialEq)]
1030pub struct StateRevision {
1031    /// A revision of the catalog.
1032    pub catalog_revision: u64,
1033    /// A revision of the session state.
1034    pub session_state_revision: u64,
1035}
1036
1037/// Execution states of a portal.
1038pub enum PortalState {
1039    /// Portal not yet started.
1040    NotStarted,
1041    /// Portal is a rows-returning statement in progress with 0 or more rows
1042    /// remaining.
1043    InProgress(Option<InProgressRows>),
1044    /// Portal has completed and should not be re-executed. If the optional string
1045    /// is present, it is returned as a CommandComplete tag, otherwise an error
1046    /// is sent.
1047    Completed(Option<String>),
1048}
1049
1050/// State of an in-progress, rows-returning portal.
1051pub struct InProgressRows {
1052    /// The current batch of rows.
1053    pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1054    /// A stream from which to fetch more row batches.
1055    pub remaining: RecordFirstRowStream,
1056}
1057
1058impl InProgressRows {
1059    /// Creates a new InProgressRows from a batch stream.
1060    pub fn new(remaining: RecordFirstRowStream) -> Self {
1061        Self {
1062            current: None,
1063            remaining,
1064        }
1065    }
1066
1067    /// Determines whether the underlying stream has ended and there are also no more rows
1068    /// stashed in `current`.
1069    pub fn no_more_rows(&self) -> bool {
1070        self.remaining.no_more_rows && self.current.is_none()
1071    }
1072}
1073
1074/// A channel of batched rows.
1075pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1076
1077/// Part of statement lifecycle. These are timestamps that come from the Adapter frontend
1078/// (`mz-pgwire`) part of the lifecycle.
1079#[derive(Debug, Clone)]
1080pub struct LifecycleTimestamps {
1081    /// When the query was received. More specifically, when the tokio recv returned.
1082    /// For a Simple Query, this is for the whole query, for the Extended Query flow, this is only
1083    /// for `FrontendMessage::Execute`. (This means that this is after parsing for the
1084    /// Extended Query flow.)
1085    pub received: EpochMillis,
1086}
1087
1088impl LifecycleTimestamps {
1089    /// Creates a new `LifecycleTimestamps`.
1090    pub fn new(received: EpochMillis) -> Self {
1091        Self { received }
1092    }
1093}
1094
1095/// The transaction status of a session.
1096///
1097/// PostgreSQL's transaction states are in backend/access/transam/xact.c.
1098#[derive(Debug)]
1099pub enum TransactionStatus<T> {
1100    /// Idle. Matches `TBLOCK_DEFAULT`.
1101    Default,
1102    /// Running a single-query transaction. Matches
1103    /// `TBLOCK_STARTED`. In PostgreSQL, when using the extended query protocol, this
1104    /// may be upgraded into multi-statement implicit query (see [`Self::InTransactionImplicit`]).
1105    /// Additionally, some statements may trigger an eager commit of the implicit transaction,
1106    /// see: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>. In
1107    /// Materialize however, we eagerly commit all statements outside of an explicit transaction
1108    /// when using the extended query protocol. Therefore, we can guarantee that this state will
1109    /// always be a single-query transaction and never be upgraded into a multi-statement implicit
1110    /// query.
1111    Started(Transaction<T>),
1112    /// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
1113    InTransaction(Transaction<T>),
1114    /// Currently in an implicit transaction started from a multi-statement query
1115    /// with more than 1 statements. Matches `TBLOCK_IMPLICIT_INPROGRESS`.
1116    InTransactionImplicit(Transaction<T>),
1117    /// In a failed transaction. Matches `TBLOCK_ABORT`.
1118    Failed(Transaction<T>),
1119}
1120
1121impl<T: TimestampManipulation> TransactionStatus<T> {
1122    /// Extracts the inner transaction ops and write lock guard if not failed.
1123    pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1124        match self {
1125            TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1126            TransactionStatus::Started(txn)
1127            | TransactionStatus::InTransaction(txn)
1128            | TransactionStatus::InTransactionImplicit(txn) => {
1129                (Some(txn.ops), txn.write_lock_guards)
1130            }
1131        }
1132    }
1133
1134    /// Exposes the inner transaction.
1135    pub fn inner(&self) -> Option<&Transaction<T>> {
1136        match self {
1137            TransactionStatus::Default => None,
1138            TransactionStatus::Started(txn)
1139            | TransactionStatus::InTransaction(txn)
1140            | TransactionStatus::InTransactionImplicit(txn)
1141            | TransactionStatus::Failed(txn) => Some(txn),
1142        }
1143    }
1144
1145    /// Exposes the inner transaction.
1146    pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1147        match self {
1148            TransactionStatus::Default => None,
1149            TransactionStatus::Started(txn)
1150            | TransactionStatus::InTransaction(txn)
1151            | TransactionStatus::InTransactionImplicit(txn)
1152            | TransactionStatus::Failed(txn) => Some(txn),
1153        }
1154    }
1155
1156    /// Whether the transaction's ops are DDL.
1157    pub fn is_ddl(&self) -> bool {
1158        match self {
1159            TransactionStatus::Default => false,
1160            TransactionStatus::Started(txn)
1161            | TransactionStatus::InTransaction(txn)
1162            | TransactionStatus::InTransactionImplicit(txn)
1163            | TransactionStatus::Failed(txn) => {
1164                matches!(txn.ops, TransactionOps::DDL { .. })
1165            }
1166        }
1167    }
1168
1169    /// Expresses whether or not the transaction was implicitly started.
1170    /// However, its negation does not imply explicitly started.
1171    pub fn is_implicit(&self) -> bool {
1172        match self {
1173            TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1174            TransactionStatus::Default
1175            | TransactionStatus::InTransaction(_)
1176            | TransactionStatus::Failed(_) => false,
1177        }
1178    }
1179
1180    /// Whether the transaction may contain multiple statements.
1181    pub fn is_in_multi_statement_transaction(&self) -> bool {
1182        match self {
1183            TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1184                true
1185            }
1186            TransactionStatus::Default
1187            | TransactionStatus::Started(_)
1188            | TransactionStatus::Failed(_) => false,
1189        }
1190    }
1191
1192    /// Whether we are in a multi-statement transaction, AND the query is immediate.
1193    pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1194        self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1195    }
1196
1197    /// Grants the writes lock to the inner transaction, returning an error if the transaction
1198    /// has already been granted write locks.
1199    ///
1200    /// # Panics
1201    /// If `self` is `TransactionStatus::Default`, which indicates that the
1202    /// transaction is idle, which is not appropriate to assign the
1203    /// coordinator's write lock to.
1204    ///
1205    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1206        match self {
1207            TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1208            TransactionStatus::Started(txn)
1209            | TransactionStatus::InTransaction(txn)
1210            | TransactionStatus::InTransactionImplicit(txn)
1211            | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1212        }
1213    }
1214
1215    /// Returns the currently held [`WriteLocks`], if this transaction holds any.
1216    pub fn write_locks(&self) -> Option<&WriteLocks> {
1217        match self {
1218            TransactionStatus::Default => None,
1219            TransactionStatus::Started(txn)
1220            | TransactionStatus::InTransaction(txn)
1221            | TransactionStatus::InTransactionImplicit(txn)
1222            | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1223        }
1224    }
1225
1226    /// The timeline of the transaction, if one exists.
1227    pub fn timeline(&self) -> Option<Timeline> {
1228        match self {
1229            TransactionStatus::Default => None,
1230            TransactionStatus::Started(txn)
1231            | TransactionStatus::InTransaction(txn)
1232            | TransactionStatus::InTransactionImplicit(txn)
1233            | TransactionStatus::Failed(txn) => txn.timeline(),
1234        }
1235    }
1236
1237    /// The cluster of the transaction, if one exists.
1238    pub fn cluster(&self) -> Option<ClusterId> {
1239        match self {
1240            TransactionStatus::Default => None,
1241            TransactionStatus::Started(txn)
1242            | TransactionStatus::InTransaction(txn)
1243            | TransactionStatus::InTransactionImplicit(txn)
1244            | TransactionStatus::Failed(txn) => txn.cluster(),
1245        }
1246    }
1247
1248    /// Snapshot of the catalog that reflects DDL operations run in this transaction.
1249    pub fn catalog_state(&self) -> Option<&CatalogState> {
1250        match self.inner() {
1251            Some(Transaction {
1252                ops: TransactionOps::DDL { state, .. },
1253                ..
1254            }) => Some(state),
1255            _ => None,
1256        }
1257    }
1258
1259    /// Reports whether any operations have been executed as part of this transaction
1260    pub fn contains_ops(&self) -> bool {
1261        match self.inner() {
1262            Some(txn) => txn.contains_ops(),
1263            None => false,
1264        }
1265    }
1266
1267    /// Checks whether the current state of this transaction allows writes
1268    /// (adding write ops).
1269    /// transaction
1270    pub fn allows_writes(&self) -> bool {
1271        match self {
1272            TransactionStatus::Started(Transaction { ops, access, .. })
1273            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1274            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1275                match ops {
1276                    TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1277                    TransactionOps::Peeks { determination, .. } => {
1278                        // If-and-only-if peeks thus far do not have a timestamp
1279                        // (i.e. they are constant), we can switch to a write
1280                        // transaction.
1281                        !determination.timestamp_context.contains_timestamp()
1282                    }
1283                    TransactionOps::Subscribe => false,
1284                    TransactionOps::Writes(_) => true,
1285                    TransactionOps::SingleStatement { .. } => false,
1286                    TransactionOps::DDL { .. } => false,
1287                }
1288            }
1289            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1290                unreachable!()
1291            }
1292        }
1293    }
1294
1295    /// Adds operations to the current transaction. An error is produced if they cannot be merged
1296    /// (i.e., a timestamp-dependent read cannot be merged to an insert).
1297    ///
1298    /// The `DDL` variant is an exception and does not merge operations, but instead overwrites the
1299    /// old ops with the new ops. This is correct because it is only used in conjunction with the
1300    /// Dry Run catalog op which returns an error containing all of the ops, and those ops are
1301    /// passed to this function which then overwrites.
1302    ///
1303    /// # Panics
1304    /// If the operations are compatible but the operation metadata doesn't match. Such as reads at
1305    /// different timestamps, reads on different timelines, reads on different clusters, etc. It's
1306    /// up to the caller to make sure these are aligned.
1307    pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1308        match self {
1309            TransactionStatus::Started(Transaction { ops, access, .. })
1310            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1311            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1312                match ops {
1313                    TransactionOps::None => {
1314                        if matches!(access, Some(TransactionAccessMode::ReadOnly))
1315                            && matches!(add_ops, TransactionOps::Writes(_))
1316                        {
1317                            return Err(AdapterError::ReadOnlyTransaction);
1318                        }
1319                        *ops = add_ops;
1320                    }
1321                    TransactionOps::Peeks {
1322                        determination,
1323                        cluster_id,
1324                        requires_linearization,
1325                    } => match add_ops {
1326                        TransactionOps::Peeks {
1327                            determination: add_timestamp_determination,
1328                            cluster_id: add_cluster_id,
1329                            requires_linearization: add_requires_linearization,
1330                        } => {
1331                            assert_eq!(*cluster_id, add_cluster_id);
1332                            match (
1333                                &determination.timestamp_context,
1334                                &add_timestamp_determination.timestamp_context,
1335                            ) {
1336                                (
1337                                    TimestampContext::TimelineTimestamp {
1338                                        timeline: txn_timeline,
1339                                        chosen_ts: txn_ts,
1340                                        oracle_ts: _,
1341                                    },
1342                                    TimestampContext::TimelineTimestamp {
1343                                        timeline: add_timeline,
1344                                        chosen_ts: add_ts,
1345                                        oracle_ts: _,
1346                                    },
1347                                ) => {
1348                                    assert_eq!(txn_timeline, add_timeline);
1349                                    assert_eq!(txn_ts, add_ts);
1350                                }
1351                                (TimestampContext::NoTimestamp, _) => {
1352                                    *determination = add_timestamp_determination
1353                                }
1354                                (_, TimestampContext::NoTimestamp) => {}
1355                            };
1356                            if matches!(requires_linearization, RequireLinearization::NotRequired)
1357                                && matches!(
1358                                    add_requires_linearization,
1359                                    RequireLinearization::Required
1360                                )
1361                            {
1362                                *requires_linearization = add_requires_linearization;
1363                            }
1364                        }
1365                        // Iff peeks thus far do not have a timestamp (i.e.
1366                        // they are constant), we can switch to a write
1367                        // transaction.
1368                        writes @ TransactionOps::Writes(..)
1369                            if !determination.timestamp_context.contains_timestamp() =>
1370                        {
1371                            *ops = writes;
1372                        }
1373                        _ => return Err(AdapterError::ReadOnlyTransaction),
1374                    },
1375                    TransactionOps::Subscribe => {
1376                        return Err(AdapterError::SubscribeOnlyTransaction);
1377                    }
1378                    TransactionOps::Writes(txn_writes) => match add_ops {
1379                        TransactionOps::Writes(mut add_writes) => {
1380                            // We should have already checked the access above, but make sure we don't miss
1381                            // it anyway.
1382                            assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1383                            txn_writes.append(&mut add_writes);
1384                        }
1385                        // Iff peeks do not have a timestamp (i.e. they are
1386                        // constant), we can permit them.
1387                        TransactionOps::Peeks { determination, .. }
1388                            if !determination.timestamp_context.contains_timestamp() => {}
1389                        _ => {
1390                            return Err(AdapterError::WriteOnlyTransaction);
1391                        }
1392                    },
1393                    TransactionOps::SingleStatement { .. } => {
1394                        return Err(AdapterError::SingleStatementTransaction);
1395                    }
1396                    TransactionOps::DDL {
1397                        ops: og_ops,
1398                        revision: og_revision,
1399                        state: og_state,
1400                        side_effects,
1401                        snapshot: og_snapshot,
1402                    } => match add_ops {
1403                        TransactionOps::DDL {
1404                            ops: new_ops,
1405                            revision: new_revision,
1406                            side_effects: mut net_new_side_effects,
1407                            state: new_state,
1408                            snapshot: new_snapshot,
1409                        } => {
1410                            if *og_revision != new_revision {
1411                                return Err(AdapterError::DDLTransactionRace);
1412                            }
1413                            // The old og_ops are overwritten, not extended.
1414                            if !new_ops.is_empty() {
1415                                *og_ops = new_ops;
1416                                *og_state = new_state;
1417                                *og_snapshot = new_snapshot;
1418                            }
1419                            side_effects.append(&mut net_new_side_effects);
1420                        }
1421                        _ => return Err(AdapterError::DDLOnlyTransaction),
1422                    },
1423                }
1424            }
1425            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1426                unreachable!()
1427            }
1428        }
1429        Ok(())
1430    }
1431}
1432
1433/// An abstraction allowing us to identify different transactions.
1434pub type TransactionId = u64;
1435
1436impl<T> Default for TransactionStatus<T> {
1437    fn default() -> Self {
1438        TransactionStatus::Default
1439    }
1440}
1441
1442/// State data for transactions.
1443#[derive(Debug)]
1444pub struct Transaction<T> {
1445    /// Plan context.
1446    pub pcx: PlanContext,
1447    /// Transaction operations.
1448    pub ops: TransactionOps<T>,
1449    /// Uniquely identifies the transaction on a per connection basis.
1450    /// Two transactions started from separate connections may share the
1451    /// same ID.
1452    /// If all IDs have been exhausted, this will wrap around back to 0.
1453    pub id: TransactionId,
1454    /// Locks for objects this transaction will operate on.
1455    write_lock_guards: Option<WriteLocks>,
1456    /// Access mode (read only, read write).
1457    access: Option<TransactionAccessMode>,
1458}
1459
1460impl<T> Transaction<T> {
1461    /// Tries to grant the write lock to this transaction for the remainder of its lifetime. Errors
1462    /// if this [`Transaction`] has already been granted write locks.
1463    fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1464        match &mut self.write_lock_guards {
1465            Some(existing) => Err(existing),
1466            locks @ None => {
1467                *locks = Some(guards);
1468                Ok(())
1469            }
1470        }
1471    }
1472
1473    /// The timeline of the transaction, if one exists.
1474    fn timeline(&self) -> Option<Timeline> {
1475        match &self.ops {
1476            TransactionOps::Peeks {
1477                determination:
1478                    TimestampDetermination {
1479                        timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1480                        ..
1481                    },
1482                ..
1483            } => Some(timeline.clone()),
1484            TransactionOps::Peeks { .. }
1485            | TransactionOps::None
1486            | TransactionOps::Subscribe
1487            | TransactionOps::Writes(_)
1488            | TransactionOps::SingleStatement { .. }
1489            | TransactionOps::DDL { .. } => None,
1490        }
1491    }
1492
1493    /// The cluster of the transaction, if one exists.
1494    pub fn cluster(&self) -> Option<ClusterId> {
1495        match &self.ops {
1496            TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1497            TransactionOps::None
1498            | TransactionOps::Subscribe
1499            | TransactionOps::Writes(_)
1500            | TransactionOps::SingleStatement { .. }
1501            | TransactionOps::DDL { .. } => None,
1502        }
1503    }
1504
1505    /// Reports whether any operations have been executed as part of this transaction
1506    fn contains_ops(&self) -> bool {
1507        !matches!(self.ops, TransactionOps::None)
1508    }
1509}
1510
1511/// A transaction's status code.
1512#[derive(Debug, Clone, Copy)]
1513pub enum TransactionCode {
1514    /// Not currently in a transaction
1515    Idle,
1516    /// Currently in a transaction
1517    InTransaction,
1518    /// Currently in a transaction block which is failed
1519    Failed,
1520}
1521
1522impl From<TransactionCode> for u8 {
1523    fn from(code: TransactionCode) -> Self {
1524        match code {
1525            TransactionCode::Idle => b'I',
1526            TransactionCode::InTransaction => b'T',
1527            TransactionCode::Failed => b'E',
1528        }
1529    }
1530}
1531
1532impl From<TransactionCode> for String {
1533    fn from(code: TransactionCode) -> Self {
1534        char::from(u8::from(code)).to_string()
1535    }
1536}
1537
1538impl<T> From<&TransactionStatus<T>> for TransactionCode {
1539    /// Convert from the Session's version
1540    fn from(status: &TransactionStatus<T>) -> TransactionCode {
1541        match status {
1542            TransactionStatus::Default => TransactionCode::Idle,
1543            TransactionStatus::Started(_) => TransactionCode::InTransaction,
1544            TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1545            TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1546            TransactionStatus::Failed(_) => TransactionCode::Failed,
1547        }
1548    }
1549}
1550
1551/// The type of operation being performed by the transaction.
1552///
1553/// This is needed because we currently do not allow mixing reads and writes in
1554/// a transaction. Use this to record what we have done, and what may need to
1555/// happen at commit.
1556#[derive(Derivative)]
1557#[derivative(Debug)]
1558pub enum TransactionOps<T> {
1559    /// The transaction has been initiated, but no statement has yet been executed
1560    /// in it.
1561    None,
1562    /// This transaction has had a peek (`SELECT`, `SUBSCRIBE`). If the inner value
1563    /// is has a timestamp, it must only do other peeks. However, if it doesn't
1564    /// have a timestamp (i.e. the values are constants), the transaction can still
1565    /// perform writes.
1566    Peeks {
1567        /// The timestamp and timestamp related metadata for the peek.
1568        determination: TimestampDetermination<T>,
1569        /// The cluster used to execute peeks.
1570        cluster_id: ClusterId,
1571        /// Whether this peek needs to be linearized.
1572        requires_linearization: RequireLinearization,
1573    },
1574    /// This transaction has done a `SUBSCRIBE` and must do nothing else.
1575    Subscribe,
1576    /// This transaction has had a write (`INSERT`, `UPDATE`, `DELETE`) and must
1577    /// only do other writes, or reads whose timestamp is None (i.e. constants).
1578    Writes(Vec<WriteOp>),
1579    /// This transaction has a prospective statement that will execute during commit.
1580    SingleStatement {
1581        /// The prospective statement.
1582        stmt: Arc<Statement<Raw>>,
1583        /// The statement params.
1584        params: mz_sql::plan::Params,
1585    },
1586    /// This transaction has run some _simple_ DDL and must do nothing else. Any statement/plan that
1587    /// uses this must return false in `must_serialize_ddl()` because this is serialized instead in
1588    /// `sequence_plan()` during `COMMIT`.
1589    DDL {
1590        /// Catalog operations that have already run, and must run before each subsequent op.
1591        ops: Vec<crate::catalog::Op>,
1592        /// In-memory state that reflects the previously applied ops.
1593        state: CatalogState,
1594        /// A list of side effects that should be executed if this DDL transaction commits.
1595        #[derivative(Debug = "ignore")]
1596        side_effects: Vec<
1597            Box<
1598                dyn for<'a> FnOnce(
1599                        &'a mut Coordinator,
1600                        Option<&'a mut ExecuteContext>,
1601                    ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1602                    + Send
1603                    + Sync,
1604            >,
1605        >,
1606        /// Transient revision of the `Catalog` when this transaction started.
1607        revision: u64,
1608        /// Snapshot of the durable transaction state after the last dry run.
1609        /// Used to initialize the next dry run's transaction so it starts
1610        /// in sync with the accumulated `state`. `None` for the first
1611        /// statement in the transaction (before any dry run).
1612        snapshot: Option<Snapshot>,
1613    },
1614}
1615
1616impl<T> TransactionOps<T> {
1617    fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1618        match self {
1619            TransactionOps::Peeks { determination, .. } => Some(determination),
1620            TransactionOps::None
1621            | TransactionOps::Subscribe
1622            | TransactionOps::Writes(_)
1623            | TransactionOps::SingleStatement { .. }
1624            | TransactionOps::DDL { .. } => None,
1625        }
1626    }
1627}
1628
1629impl<T> Default for TransactionOps<T> {
1630    fn default() -> Self {
1631        Self::None
1632    }
1633}
1634
1635/// An `INSERT` waiting to be committed.
1636#[derive(Debug, Clone, PartialEq)]
1637pub struct WriteOp {
1638    /// The target table.
1639    pub id: CatalogItemId,
1640    /// The data rows.
1641    pub rows: TableData,
1642}
1643
1644/// Whether a transaction requires linearization.
1645#[derive(Debug)]
1646pub enum RequireLinearization {
1647    /// Linearization is required.
1648    Required,
1649    /// Linearization is not required.
1650    NotRequired,
1651}
1652
1653impl From<&ExplainContext> for RequireLinearization {
1654    fn from(ctx: &ExplainContext) -> Self {
1655        match ctx {
1656            ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1657                RequireLinearization::Required
1658            }
1659            _ => RequireLinearization::NotRequired,
1660        }
1661    }
1662}
1663
1664/// A complete set of exclusive locks for writing to collections identified by [`CatalogItemId`]s.
1665///
1666/// To prevent deadlocks between two sessions, we do not allow acquiring a partial set of locks.
1667#[derive(Debug)]
1668pub struct WriteLocks {
1669    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1670    /// Connection that currently holds these locks, used for tracing purposes only.
1671    conn_id: ConnectionId,
1672}
1673
1674impl WriteLocks {
1675    /// Create a [`WriteLocksBuilder`] pre-defining all of the locks we need.
1676    ///
1677    /// When "finishing" the builder with [`WriteLocksBuilder::all_or_nothing`], if we haven't
1678    /// acquired all of the necessary locks we drop any partially acquired ones.
1679    pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1680        let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1681        WriteLocksBuilder { locks }
1682    }
1683
1684    /// Validate this set of [`WriteLocks`] is sufficient for the provided collections.
1685    /// Dropping the currently held locks if it's not.
1686    pub fn validate(
1687        self,
1688        collections: impl Iterator<Item = CatalogItemId>,
1689    ) -> Result<Self, BTreeSet<CatalogItemId>> {
1690        let mut missing = BTreeSet::new();
1691        for collection in collections {
1692            if !self.locks.contains_key(&collection) {
1693                missing.insert(collection);
1694            }
1695        }
1696
1697        if missing.is_empty() {
1698            Ok(self)
1699        } else {
1700            // Explicitly drop the already acquired locks.
1701            drop(self);
1702            Err(missing)
1703        }
1704    }
1705}
1706
1707impl Drop for WriteLocks {
1708    fn drop(&mut self) {
1709        // We may have merged the locks into GroupCommitWriteLocks, thus it could be empty.
1710        if !self.locks.is_empty() {
1711            tracing::info!(
1712                conn_id = %self.conn_id,
1713                locks = ?self.locks,
1714                "dropping write locks",
1715            );
1716        }
1717    }
1718}
1719
1720/// A builder struct that helps us acquire all of the locks we need, or none of them.
1721///
1722/// See [`WriteLocks::builder`].
1723#[derive(Debug)]
1724pub struct WriteLocksBuilder {
1725    locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1726}
1727
1728impl WriteLocksBuilder {
1729    /// Adds a lock to this builder.
1730    pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1731        self.locks.insert(id, Some(lock));
1732    }
1733
1734    /// Finish this builder by returning either all of the necessary locks, or none of them.
1735    ///
1736    /// If we fail to acquire all of the locks, returns one of the [`CatalogItemId`]s that we
1737    /// failed to acquire a lock for, that should be awaited so we know when to run again.
1738    pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1739        let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1740            self.locks
1741                .into_iter()
1742                .partition_map(|(gid, lock)| match lock {
1743                    Some(lock) => itertools::Either::Left((gid, lock)),
1744                    None => itertools::Either::Right(gid),
1745                });
1746
1747        match missing.iter().next() {
1748            None => {
1749                tracing::info!(%conn_id, ?locks, "acquired write locks");
1750                Ok(WriteLocks {
1751                    locks,
1752                    conn_id: conn_id.clone(),
1753                })
1754            }
1755            Some(gid) => {
1756                tracing::info!(?missing, "failed to acquire write locks");
1757                // Explicitly drop the already acquired locks.
1758                drop(locks);
1759                Err(*gid)
1760            }
1761        }
1762    }
1763}
1764
1765/// Collection of [`WriteLocks`] gathered during [`group_commit`].
1766///
1767/// Note: This struct should __never__ be used outside of group commit because it attempts to merge
1768/// together several collections of [`WriteLocks`] which if not done carefully can cause deadlocks
1769/// or consistency violations.
1770///
1771/// We must prevent writes from occurring to tables during read then write plans (e.g. `UPDATE`)
1772/// but we can allow blind writes (e.g. `INSERT`) to get committed concurrently at the same
1773/// timestamp when submitting the updates from a read then write plan.
1774///
1775/// Naively it would seem as though we could allow blind writes to occur whenever as blind writes
1776/// could never cause invalid retractions, but it could cause us to violate serializability because
1777/// there is no total order we could define for the transactions. Consider the following scenario:
1778///
1779/// ```text
1780/// table: foo
1781///
1782///  a | b
1783/// --------
1784///  x   2
1785///  y   3
1786///  z   4
1787///
1788/// -- Session(A)
1789/// -- read then write plan, reads at t0, writes at t3, transaction Ta
1790/// DELETE FROM foo WHERE b % 2 = 0;
1791///
1792///
1793/// -- Session(B)
1794/// -- blind write into foo, writes at t1, transaction Tb
1795/// INSERT INTO foo VALUES ('q', 6);
1796/// -- select from foo, reads at t2, transaction Tc
1797/// SELECT * FROM foo;
1798///
1799///
1800/// The times these operations occur at are ordered:
1801/// t0 < t1 < t2 < t3
1802///
1803/// Given the timing of the operations, the transactions must have the following order:
1804///
1805/// * Ta does not observe ('q', 6), so Ta < Tb
1806/// * Tc does observe ('q', 6), so Tb < Tc
1807/// * Tc does not observe the retractions from Ta, so Tc < Ta
1808///
1809/// For total order to exist, Ta < Tb < Tc < Ta, which is impossible.
1810/// ```
1811///
1812/// [`group_commit`]: super::coord::Coordinator::group_commit
1813#[derive(Debug, Default)]
1814pub(crate) struct GroupCommitWriteLocks {
1815    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1816}
1817
1818impl GroupCommitWriteLocks {
1819    /// Merge a set of [`WriteLocks`] into this collection for group commit.
1820    pub fn merge(&mut self, mut locks: WriteLocks) {
1821        // Note: Ideally we would use `.drain`, but that method doesn't exist for BTreeMap.
1822        //
1823        // See: <https://github.com/rust-lang/rust/issues/81074>
1824        let existing = std::mem::take(&mut locks.locks);
1825        self.locks.extend(existing);
1826    }
1827
1828    /// Returns the collections we're missing locks for, if any.
1829    pub fn missing_locks(
1830        &self,
1831        writes: impl Iterator<Item = CatalogItemId>,
1832    ) -> BTreeSet<CatalogItemId> {
1833        let mut missing = BTreeSet::new();
1834        for write in writes {
1835            if !self.locks.contains_key(&write) {
1836                missing.insert(write);
1837            }
1838        }
1839        missing
1840    }
1841}
1842
1843impl Drop for GroupCommitWriteLocks {
1844    fn drop(&mut self) {
1845        if !self.locks.is_empty() {
1846            tracing::info!(
1847                locks = ?self.locks,
1848                "dropping group commit write locks",
1849            );
1850        }
1851    }
1852}