mz_adapter/
session.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Per-connection configuration parameters and state.
11
12#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use derivative::Derivative;
25use itertools::Itertools;
26use mz_adapter_types::connection::ConnectionId;
27use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
28use mz_controller_types::ClusterId;
29use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_pgwire_common::Format;
32use mz_repr::role_id::RoleId;
33use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
34use mz_repr::{CatalogItemId, Datum, Row, RowIterator, ScalarType, TimestampManipulation};
35use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
36use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::user::{
39    INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
40};
41use mz_sql::session::vars::IsolationLevel;
42pub use mz_sql::session::vars::{
43    DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
44    SERVER_PATCH_VERSION, SessionVars, Var,
45};
46use mz_sql_parser::ast::TransactionIsolationLevel;
47use mz_storage_client::client::TableData;
48use mz_storage_types::sources::Timeline;
49use qcell::{QCell, QCellOwner};
50use rand::Rng;
51use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
52use tokio::sync::watch;
53use uuid::Uuid;
54
55use crate::catalog::CatalogState;
56use crate::client::RecordFirstRowStream;
57use crate::coord::appends::BuiltinTableAppendNotify;
58use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
59use crate::coord::peek::PeekResponseUnary;
60use crate::coord::statement_logging::PreparedStatementLoggingInfo;
61use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
62use crate::coord::{Coordinator, ExplainContext};
63use crate::error::AdapterError;
64use crate::metrics::{Metrics, SessionMetrics};
65use crate::{AdapterNotice, ExecuteContext};
66
67const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
68
69/// A session holds per-connection state.
70#[derive(Derivative)]
71#[derivative(Debug)]
72pub struct Session<T = mz_repr::Timestamp>
73where
74    T: Debug + Clone + Send + Sync,
75{
76    conn_id: ConnectionId,
77    /// A globally unique identifier for the session. Not to be confused
78    /// with `conn_id`, which may be reused.
79    uuid: Uuid,
80    prepared_statements: BTreeMap<String, PreparedStatement>,
81    portals: BTreeMap<String, Portal>,
82    transaction: TransactionStatus<T>,
83    pcx: Option<PlanContext>,
84    metrics: SessionMetrics,
85    #[derivative(Debug = "ignore")]
86    builtin_updates: Option<BuiltinTableAppendNotify>,
87
88    /// The role metadata of the current session.
89    ///
90    /// Invariant: role_metadata must be `Some` after the user has
91    /// successfully connected to and authenticated with Materialize.
92    ///
93    /// Prefer using this value over [`SessionConfig::user`].
94    //
95    // It would be better for this not to be an Option, but the
96    // `Session` is initialized before the user has connected to
97    // Materialize and is able to look up the `RoleMetadata`. The `Session`
98    // is also used to return an error when no role exists and
99    // therefore there is no valid `RoleMetadata`.
100    role_metadata: Option<RoleMetadata>,
101    client_ip: Option<IpAddr>,
102    vars: SessionVars,
103    notices_tx: mpsc::UnboundedSender<AdapterNotice>,
104    notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
105    next_transaction_id: TransactionId,
106    secret_key: u32,
107    external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
108    // Token allowing us to access `Arc<QCell<StatementLogging>>`
109    // metadata. We want these to be reference-counted, because the same
110    // statement might be referenced from multiple portals simultaneously.
111    //
112    // However, they can't be `Rc<RefCell<StatementLogging>>`, because
113    // the `Session` is sent around to different threads.
114    //
115    // On the other hand, they don't need to be
116    // `Arc<Mutex<StatementLogging>>`, because they will always be
117    // accessed from the same thread that the `Session` is currently
118    // on. We express this by gating access with this token.
119    #[derivative(Debug = "ignore")]
120    qcell_owner: QCellOwner,
121    session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
122    /// Incremented when session state that is relevant to prepared statement planning changes.
123    /// Currently, only changes to `portals` are tracked. Changes to `prepared_statements` don't
124    /// need to be tracked, because prepared statements can't depend on other prepared statements.
125    /// TODO: We might want to track changes also to session variables.
126    /// (`Catalog::transient_revision` similarly tracks changes on the catalog side.)
127    state_revision: u64,
128}
129
130impl<T> SessionMetadata for Session<T>
131where
132    T: Debug + Clone + Send + Sync,
133    T: TimestampManipulation,
134{
135    fn conn_id(&self) -> &ConnectionId {
136        &self.conn_id
137    }
138
139    fn client_ip(&self) -> Option<&IpAddr> {
140        self.client_ip.as_ref()
141    }
142
143    fn pcx(&self) -> &PlanContext {
144        &self
145            .transaction()
146            .inner()
147            .expect("no active transaction")
148            .pcx
149    }
150
151    fn role_metadata(&self) -> &RoleMetadata {
152        self.role_metadata
153            .as_ref()
154            .expect("role_metadata invariant violated")
155    }
156
157    fn vars(&self) -> &SessionVars {
158        &self.vars
159    }
160}
161
162/// Data structure suitable for passing to other threads that need access to some common Session
163/// properties.
164#[derive(Debug)]
165pub struct SessionMeta {
166    conn_id: ConnectionId,
167    client_ip: Option<IpAddr>,
168    pcx: PlanContext,
169    role_metadata: RoleMetadata,
170    vars: SessionVars,
171}
172
173impl SessionMetadata for SessionMeta {
174    fn vars(&self) -> &SessionVars {
175        &self.vars
176    }
177
178    fn conn_id(&self) -> &ConnectionId {
179        &self.conn_id
180    }
181
182    fn client_ip(&self) -> Option<&IpAddr> {
183        self.client_ip.as_ref()
184    }
185
186    fn pcx(&self) -> &PlanContext {
187        &self.pcx
188    }
189
190    fn role_metadata(&self) -> &RoleMetadata {
191        &self.role_metadata
192    }
193}
194
195/// Configures a new [`Session`].
196#[derive(Debug, Clone)]
197pub struct SessionConfig {
198    /// The connection ID for the session.
199    ///
200    /// May be reused after the session terminates.
201    pub conn_id: ConnectionId,
202    /// A universally unique identifier for the session, across all processes,
203    /// region, and all time.
204    ///
205    /// Must not be reused, even after the session terminates.
206    pub uuid: Uuid,
207    /// The peer address of the client
208    pub client_ip: Option<IpAddr>,
209    /// The name of the user associated with the session.
210    pub user: String,
211    /// An optional receiver that the session will periodically check for
212    /// updates to a user's external metadata.
213    pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
214    /// The metadata of the user associated with the session.
215    pub internal_user_metadata: Option<InternalUserMetadata>,
216    /// Helm chart version
217    pub helm_chart_version: Option<String>,
218}
219
220impl<T: TimestampManipulation> Session<T> {
221    /// Creates a new session for the specified connection ID.
222    pub(crate) fn new(
223        build_info: &'static BuildInfo,
224        config: SessionConfig,
225        metrics: SessionMetrics,
226    ) -> Session<T> {
227        assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
228        Self::new_internal(build_info, config, metrics)
229    }
230
231    /// Returns a reference-less collection of data usable by other tasks that don't have ownership
232    /// of the Session.
233    pub fn meta(&self) -> SessionMeta {
234        SessionMeta {
235            conn_id: self.conn_id().clone(),
236            client_ip: self.client_ip().copied(),
237            pcx: self.pcx().clone(),
238            role_metadata: self.role_metadata().clone(),
239            vars: self.vars.clone(),
240        }
241
242        // TODO: soft_assert that these are the same as Session.
243    }
244
245    /// Creates new statement logging metadata for a one-off
246    /// statement.
247    // Normally, such logging information would be created as part of
248    // allocating a new prepared statement, and a refcounted handle
249    // would be copied from that prepared statement to portals during
250    // binding. However, we also support (via `Command::declare`)
251    // binding a statement directly to a portal without creating an
252    // intermediate prepared statement. Thus, for those cases, a
253    // mechanism for generating the logging metadata directly is needed.
254    pub(crate) fn mint_logging<A: AstInfo>(
255        &self,
256        raw_sql: String,
257        stmt: Option<&Statement<A>>,
258        now: EpochMillis,
259    ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
260        Arc::new(QCell::new(
261            &self.qcell_owner,
262            PreparedStatementLoggingInfo::still_to_log(
263                raw_sql,
264                stmt,
265                now,
266                "".to_string(),
267                self.uuid,
268                false,
269            ),
270        ))
271    }
272
273    pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
274        self.qcell_owner.rw(&*cell)
275    }
276
277    /// Returns a unique ID for the session.
278    /// Not to be confused with `connection_id`, which can be reused.
279    pub fn uuid(&self) -> Uuid {
280        self.uuid
281    }
282
283    /// Creates a new dummy session.
284    ///
285    /// Dummy sessions are intended for use when executing queries on behalf of
286    /// the system itself, rather than on behalf of a user.
287    pub fn dummy() -> Session<T> {
288        let registry = MetricsRegistry::new();
289        let metrics = Metrics::register_into(&registry);
290        let metrics = metrics.session_metrics();
291        let mut dummy = Self::new_internal(
292            &DUMMY_BUILD_INFO,
293            SessionConfig {
294                conn_id: DUMMY_CONNECTION_ID,
295                uuid: Uuid::new_v4(),
296                user: SYSTEM_USER.name.clone(),
297                client_ip: None,
298                external_metadata_rx: None,
299                internal_user_metadata: None,
300                helm_chart_version: None,
301            },
302            metrics,
303        );
304        dummy.initialize_role_metadata(RoleId::User(0));
305        dummy
306    }
307
308    fn new_internal(
309        build_info: &'static BuildInfo,
310        SessionConfig {
311            conn_id,
312            uuid,
313            user,
314            client_ip,
315            mut external_metadata_rx,
316            internal_user_metadata,
317            helm_chart_version,
318        }: SessionConfig,
319        metrics: SessionMetrics,
320    ) -> Session<T> {
321        let (notices_tx, notices_rx) = mpsc::unbounded_channel();
322        let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
323        let user = User {
324            name: user,
325            internal_metadata: internal_user_metadata,
326            external_metadata: external_metadata_rx
327                .as_mut()
328                .map(|rx| rx.borrow_and_update().clone()),
329        };
330        let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
331        if let Some(default_cluster) = default_cluster {
332            vars.set_cluster(default_cluster.clone());
333        }
334        Session {
335            conn_id,
336            uuid,
337            transaction: TransactionStatus::Default,
338            pcx: None,
339            metrics,
340            builtin_updates: None,
341            prepared_statements: BTreeMap::new(),
342            portals: BTreeMap::new(),
343            role_metadata: None,
344            client_ip,
345            vars,
346            notices_tx,
347            notices_rx,
348            next_transaction_id: 0,
349            secret_key: rand::thread_rng().r#gen(),
350            external_metadata_rx,
351            qcell_owner: QCellOwner::new(),
352            session_oracles: BTreeMap::new(),
353            state_revision: 0,
354        }
355    }
356
357    /// Returns the secret key associated with the session.
358    pub fn secret_key(&self) -> u32 {
359        self.secret_key
360    }
361
362    fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
363        if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
364            wall_time = *mock_time;
365        }
366        PlanContext::new(wall_time)
367    }
368
369    /// Starts an explicit transaction, or changes an implicit to an explicit
370    /// transaction.
371    pub fn start_transaction(
372        &mut self,
373        wall_time: DateTime<Utc>,
374        access: Option<TransactionAccessMode>,
375        isolation_level: Option<TransactionIsolationLevel>,
376    ) -> Result<(), AdapterError> {
377        // Check that current transaction state is compatible with new `access`
378        if let Some(txn) = self.transaction.inner() {
379            // `READ WRITE` prohibited if:
380            // - Currently in `READ ONLY`
381            // - Already performed a query
382            let read_write_prohibited = match txn.ops {
383                TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
384                    txn.access == Some(TransactionAccessMode::ReadOnly)
385                }
386                TransactionOps::None
387                | TransactionOps::Writes(_)
388                | TransactionOps::SingleStatement { .. }
389                | TransactionOps::DDL { .. } => false,
390            };
391
392            if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
393                return Err(AdapterError::ReadWriteUnavailable);
394            }
395        }
396
397        match std::mem::take(&mut self.transaction) {
398            TransactionStatus::Default => {
399                let id = self.next_transaction_id;
400                self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
401                self.transaction = TransactionStatus::InTransaction(Transaction {
402                    pcx: self.new_pcx(wall_time),
403                    ops: TransactionOps::None,
404                    write_lock_guards: None,
405                    access,
406                    id,
407                });
408            }
409            TransactionStatus::Started(mut txn)
410            | TransactionStatus::InTransactionImplicit(mut txn)
411            | TransactionStatus::InTransaction(mut txn) => {
412                if access.is_some() {
413                    txn.access = access;
414                }
415                self.transaction = TransactionStatus::InTransaction(txn);
416            }
417            TransactionStatus::Failed(_) => unreachable!(),
418        };
419
420        if let Some(isolation_level) = isolation_level {
421            self.vars
422                .set_local_transaction_isolation(isolation_level.into());
423        }
424
425        Ok(())
426    }
427
428    /// Starts either a single statement or implicit transaction based on the
429    /// number of statements, but only if no transaction has been started already.
430    pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
431        if let TransactionStatus::Default = self.transaction {
432            let id = self.next_transaction_id;
433            self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
434            let txn = Transaction {
435                pcx: self.new_pcx(wall_time),
436                ops: TransactionOps::None,
437                write_lock_guards: None,
438                access: None,
439                id,
440            };
441            match stmts {
442                1 => self.transaction = TransactionStatus::Started(txn),
443                n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
444                _ => {}
445            }
446        }
447    }
448
449    /// Starts a single statement transaction, but only if no transaction has been started already.
450    pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
451        self.start_transaction_implicit(wall_time, 1);
452    }
453
454    /// Clears a transaction, setting its state to Default and destroying all
455    /// portals. Returned are:
456    /// - sinks that were started in this transaction and need to be dropped
457    /// - the cleared transaction so its operations can be handled
458    ///
459    /// The [Postgres protocol docs](https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-FLOW-EXT-QUERY) specify:
460    /// > a named portal object lasts till the end of the current transaction
461    /// and
462    /// > An unnamed portal is destroyed at the end of the transaction
463    #[must_use]
464    pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
465        self.portals.clear();
466        self.pcx = None;
467        self.state_revision += 1;
468        mem::take(&mut self.transaction)
469    }
470
471    /// Marks the current transaction as failed.
472    pub fn fail_transaction(mut self) -> Self {
473        match self.transaction {
474            TransactionStatus::Default => unreachable!(),
475            TransactionStatus::Started(txn)
476            | TransactionStatus::InTransactionImplicit(txn)
477            | TransactionStatus::InTransaction(txn) => {
478                self.transaction = TransactionStatus::Failed(txn);
479            }
480            TransactionStatus::Failed(_) => {}
481        };
482        self
483    }
484
485    /// Returns the current transaction status.
486    pub fn transaction(&self) -> &TransactionStatus<T> {
487        &self.transaction
488    }
489
490    /// Returns the current transaction status.
491    pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
492        &mut self.transaction
493    }
494
495    /// Returns the session's transaction code.
496    pub fn transaction_code(&self) -> TransactionCode {
497        self.transaction().into()
498    }
499
500    /// Adds operations to the current transaction. An error is produced if
501    /// they cannot be merged (i.e., a timestamp-dependent read cannot be
502    /// merged to an insert).
503    pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
504        self.transaction.add_ops(add_ops)
505    }
506
507    /// Returns a channel on which to send notices to the session.
508    pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
509        self.notices_tx.clone()
510    }
511
512    /// Adds a notice to the session.
513    pub fn add_notice(&self, notice: AdapterNotice) {
514        self.add_notices([notice])
515    }
516
517    /// Adds multiple notices to the session.
518    pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
519        for notice in notices {
520            let _ = self.notices_tx.send(notice);
521        }
522    }
523
524    /// Awaits a possible notice.
525    ///
526    /// This method is cancel safe.
527    pub async fn recv_notice(&mut self) -> AdapterNotice {
528        // This method is cancel safe because recv is cancel safe.
529        loop {
530            let notice = self
531                .notices_rx
532                .recv()
533                .await
534                .expect("Session also holds a sender, so recv won't ever return None");
535            match self.notice_filter(notice) {
536                Some(notice) => return notice,
537                None => continue,
538            }
539        }
540    }
541
542    /// Returns a draining iterator over the notices attached to the session.
543    pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
544        let mut notices = Vec::new();
545        while let Ok(notice) = self.notices_rx.try_recv() {
546            if let Some(notice) = self.notice_filter(notice) {
547                notices.push(notice);
548            }
549        }
550        notices
551    }
552
553    /// Returns Some if the notice should be reported, otherwise None.
554    fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
555        // Filter out low threshold severity.
556        let minimum_client_severity = self.vars.client_min_messages();
557        let sev = notice.severity();
558        if !minimum_client_severity.should_output_to_client(&sev) {
559            return None;
560        }
561        // Filter out notices for other clusters.
562        if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = &notice {
563            if cluster != self.vars.cluster() {
564                return None;
565            }
566        }
567        Some(notice)
568    }
569
570    /// Sets the transaction ops to `TransactionOps::None`. Must only be used after
571    /// verifying that no transaction anomalies will occur if cleared.
572    pub fn clear_transaction_ops(&mut self) {
573        if let Some(txn) = self.transaction.inner_mut() {
574            txn.ops = TransactionOps::None;
575        }
576    }
577
578    /// If the current transaction ops belong to a read, then sets the
579    /// ops to `None`, returning the old read timestamp context if
580    /// any existed. Must only be used after verifying that no transaction
581    /// anomalies will occur if cleared.
582    pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
583        if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
584            if let TransactionOps::Peeks { .. } = ops {
585                let ops = std::mem::take(ops);
586                Some(
587                    ops.timestamp_determination()
588                        .expect("checked above")
589                        .timestamp_context,
590                )
591            } else {
592                None
593            }
594        } else {
595            None
596        }
597    }
598
599    /// Returns the transaction's read timestamp determination, if set.
600    ///
601    /// Returns `None` if there is no active transaction, or if the active
602    /// transaction is not a read transaction.
603    pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
604        match self.transaction.inner() {
605            Some(Transaction {
606                pcx: _,
607                ops: TransactionOps::Peeks { determination, .. },
608                write_lock_guards: _,
609                access: _,
610                id: _,
611            }) => Some(determination.clone()),
612            _ => None,
613        }
614    }
615
616    /// Whether this session has a timestamp for a read transaction.
617    pub fn contains_read_timestamp(&self) -> bool {
618        matches!(
619            self.transaction.inner(),
620            Some(Transaction {
621                pcx: _,
622                ops: TransactionOps::Peeks {
623                    determination: TimestampDetermination {
624                        timestamp_context: TimestampContext::TimelineTimestamp { .. },
625                        ..
626                    },
627                    ..
628                },
629                write_lock_guards: _,
630                access: _,
631                id: _,
632            })
633        )
634    }
635
636    /// Registers the prepared statement under `name`.
637    pub fn set_prepared_statement(
638        &mut self,
639        name: String,
640        stmt: Option<Statement<Raw>>,
641        raw_sql: String,
642        desc: StatementDesc,
643        state_revision: StateRevision,
644        now: EpochMillis,
645    ) {
646        let logging = PreparedStatementLoggingInfo::still_to_log(
647            raw_sql,
648            stmt.as_ref(),
649            now,
650            name.clone(),
651            self.uuid,
652            false,
653        );
654        let statement = PreparedStatement {
655            stmt,
656            desc,
657            state_revision,
658            logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
659        };
660        self.prepared_statements.insert(name, statement);
661    }
662
663    /// Removes the prepared statement associated with `name`.
664    ///
665    /// Returns whether a statement previously existed.
666    pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
667        self.prepared_statements.remove(name).is_some()
668    }
669
670    /// Removes all prepared statements.
671    pub fn remove_all_prepared_statements(&mut self) {
672        self.prepared_statements.clear();
673    }
674
675    /// Retrieves the prepared statement associated with `name`.
676    ///
677    /// This is unverified and could be incorrect if the underlying catalog has
678    /// changed.
679    pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
680        self.prepared_statements.get(name)
681    }
682
683    /// Retrieves the prepared statement associated with `name`.
684    ///
685    /// This is unverified and could be incorrect if the underlying catalog has
686    /// changed.
687    pub fn get_prepared_statement_mut_unverified(
688        &mut self,
689        name: &str,
690    ) -> Option<&mut PreparedStatement> {
691        self.prepared_statements.get_mut(name)
692    }
693
694    /// Returns the prepared statements for the session.
695    pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
696        &self.prepared_statements
697    }
698
699    /// Returns the portals for the session.
700    pub fn portals(&self) -> &BTreeMap<String, Portal> {
701        &self.portals
702    }
703
704    /// Binds the specified portal to the specified prepared statement.
705    ///
706    /// If the prepared statement contains parameters, the values and types of
707    /// those parameters must be provided in `params`. It is the caller's
708    /// responsibility to ensure that the correct number of parameters is
709    /// provided.
710    ///
711    /// The `results_formats` parameter sets the desired format of the results,
712    /// and is stored on the portal.
713    pub fn set_portal(
714        &mut self,
715        portal_name: String,
716        desc: StatementDesc,
717        stmt: Option<Statement<Raw>>,
718        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
719        params: Vec<(Datum, ScalarType)>,
720        result_formats: Vec<Format>,
721        state_revision: StateRevision,
722    ) -> Result<(), AdapterError> {
723        // The empty portal can be silently replaced.
724        if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
725            return Err(AdapterError::DuplicateCursor(portal_name));
726        }
727        self.state_revision += 1;
728        let param_types = desc.param_types.clone();
729        self.portals.insert(
730            portal_name,
731            Portal {
732                stmt: stmt.map(Arc::new),
733                desc,
734                state_revision,
735                parameters: Params {
736                    datums: Row::pack(params.iter().map(|(d, _t)| d)),
737                    execute_types: params.into_iter().map(|(_d, t)| t).collect(),
738                    expected_types: param_types,
739                },
740                result_formats,
741                state: PortalState::NotStarted,
742                logging,
743                lifecycle_timestamps: None,
744            },
745        );
746        Ok(())
747    }
748
749    /// Removes the specified portal.
750    ///
751    /// If there is no such portal, this method does nothing. Returns whether that portal existed.
752    pub fn remove_portal(&mut self, portal_name: &str) -> bool {
753        self.state_revision += 1;
754        self.portals.remove(portal_name).is_some()
755    }
756
757    /// Retrieves a reference to the specified portal.
758    ///
759    /// If there is no such portal, returns `None`.
760    pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
761        self.portals.get(portal_name)
762    }
763
764    /// Retrieves a mutable reference to the specified portal.
765    ///
766    /// If there is no such portal, returns `None`.
767    ///
768    /// Note: When using the returned `PortalRefMut`, there is no need to increment
769    /// `Session::state_revision`, because the portal's meaning is not changed.
770    pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
771        self.portals.get_mut(portal_name).map(|p| PortalRefMut {
772            stmt: &p.stmt,
773            desc: &p.desc,
774            state_revision: &mut p.state_revision,
775            parameters: &mut p.parameters,
776            result_formats: &mut p.result_formats,
777            logging: &mut p.logging,
778            state: &mut p.state,
779            lifecycle_timestamps: &mut p.lifecycle_timestamps,
780        })
781    }
782
783    /// Creates and installs a new portal.
784    pub fn create_new_portal(
785        &mut self,
786        stmt: Option<Statement<Raw>>,
787        logging: Arc<QCell<PreparedStatementLoggingInfo>>,
788        desc: StatementDesc,
789        parameters: Params,
790        result_formats: Vec<Format>,
791        state_revision: StateRevision,
792    ) -> Result<String, AdapterError> {
793        self.state_revision += 1;
794
795        // See: https://github.com/postgres/postgres/blob/84f5c2908dad81e8622b0406beea580e40bb03ac/src/backend/utils/mmgr/portalmem.c#L234
796        for i in 0usize.. {
797            let name = format!("<unnamed portal {}>", i);
798            match self.portals.entry(name.clone()) {
799                Entry::Occupied(_) => continue,
800                Entry::Vacant(entry) => {
801                    entry.insert(Portal {
802                        stmt: stmt.map(Arc::new),
803                        desc,
804                        state_revision,
805                        parameters,
806                        result_formats,
807                        state: PortalState::NotStarted,
808                        logging,
809                        lifecycle_timestamps: None,
810                    });
811                    return Ok(name);
812                }
813            }
814        }
815
816        coord_bail!("unable to create a new portal");
817    }
818
819    /// Resets the session to its initial state. Returns sinks that need to be
820    /// dropped.
821    pub fn reset(&mut self) {
822        let _ = self.clear_transaction();
823        self.prepared_statements.clear();
824        self.vars.reset_all();
825    }
826
827    /// Returns the [application_name] that created this session.
828    ///
829    /// [application_name]: (https://www.postgresql.org/docs/current/runtime-config-logging.html#GUC-APPLICATION-NAME)
830    pub fn application_name(&self) -> &str {
831        self.vars.application_name()
832    }
833
834    /// Returns a reference to the variables in this session.
835    pub fn vars(&self) -> &SessionVars {
836        &self.vars
837    }
838
839    /// Returns a mutable reference to the variables in this session.
840    pub fn vars_mut(&mut self) -> &mut SessionVars {
841        &mut self.vars
842    }
843
844    /// Grants a set of write locks to this session's inner [`Transaction`].
845    ///
846    /// # Panics
847    /// If the inner transaction is idle. See [`TransactionStatus::try_grant_write_locks`].
848    ///
849    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
850        self.transaction.try_grant_write_locks(guards)
851    }
852
853    /// Drains any external metadata updates and applies the changes from the latest update.
854    pub fn apply_external_metadata_updates(&mut self) {
855        // If no sender is registered then there isn't anything to do.
856        let Some(rx) = &mut self.external_metadata_rx else {
857            return;
858        };
859
860        // If the value hasn't changed then return.
861        if !rx.has_changed().unwrap_or(false) {
862            return;
863        }
864
865        // Update our metadata! Note the short critical section (just a clone) to avoid blocking
866        // the sending side of this watch channel.
867        let metadata = rx.borrow_and_update().clone();
868        self.vars.set_external_user_metadata(metadata);
869    }
870
871    /// Initializes the session's role metadata.
872    pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
873        self.role_metadata = Some(RoleMetadata::new(role_id));
874    }
875
876    /// Ensures that a timestamp oracle exists for `timeline` and returns a mutable reference to
877    /// the timestamp oracle.
878    pub fn ensure_timestamp_oracle(
879        &mut self,
880        timeline: Timeline,
881    ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
882        self.session_oracles
883            .entry(timeline)
884            .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
885    }
886
887    /// Ensures that a timestamp oracle exists for reads and writes from/to a local input and
888    /// returns a mutable reference to the timestamp oracle.
889    pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
890        self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
891    }
892
893    /// Returns a reference to the timestamp oracle for `timeline`.
894    pub fn get_timestamp_oracle(
895        &self,
896        timeline: &Timeline,
897    ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
898        self.session_oracles.get(timeline)
899    }
900
901    /// If the current session is using the Strong Session Serializable isolation level advance the
902    /// session local timestamp oracle to `write_ts`.
903    pub fn apply_write(&mut self, timestamp: T) {
904        if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
905            self.ensure_local_timestamp_oracle().apply_write(timestamp);
906        }
907    }
908
909    /// Returns the [`SessionMetrics`] instance associated with this [`Session`].
910    pub fn metrics(&self) -> &SessionMetrics {
911        &self.metrics
912    }
913
914    /// Sets the `BuiltinTableAppendNotify` for this session.
915    pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
916        let prev = self.builtin_updates.replace(fut);
917        mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
918    }
919
920    /// Takes the stashed `BuiltinTableAppendNotify`, if one exists, and returns a [`Future`] that
921    /// waits for the writes to complete.
922    pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
923        if let Some(fut) = self.builtin_updates.take() {
924            // Record how long we blocked for, if we blocked at all.
925            let histogram = self
926                .metrics()
927                .session_startup_table_writes_seconds()
928                .clone();
929            Some(async move {
930                fut.wall_time().observe(histogram).await;
931            })
932        } else {
933            None
934        }
935    }
936
937    /// Return the state_revision of the session, which can be used by dependent objects for knowing
938    /// when to re-plan due to session state changes.
939    pub fn state_revision(&self) -> u64 {
940        self.state_revision
941    }
942}
943
944/// A prepared statement.
945#[derive(Derivative, Clone)]
946#[derivative(Debug)]
947pub struct PreparedStatement {
948    stmt: Option<Statement<Raw>>,
949    desc: StatementDesc,
950    /// The most recent state revision that has verified this statement.
951    pub state_revision: StateRevision,
952    #[derivative(Debug = "ignore")]
953    logging: Arc<QCell<PreparedStatementLoggingInfo>>,
954}
955
956impl PreparedStatement {
957    /// Returns the AST associated with this prepared statement,
958    /// if the prepared statement was not the empty query.
959    pub fn stmt(&self) -> Option<&Statement<Raw>> {
960        self.stmt.as_ref()
961    }
962
963    /// Returns the description of the prepared statement.
964    pub fn desc(&self) -> &StatementDesc {
965        &self.desc
966    }
967
968    /// Returns a handle to the metadata for statement logging.
969    pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
970        &self.logging
971    }
972}
973
974/// A portal represents the execution state of a running or runnable query.
975#[derive(Derivative)]
976#[derivative(Debug)]
977pub struct Portal {
978    /// The statement that is bound to this portal.
979    pub stmt: Option<Arc<Statement<Raw>>>,
980    /// The statement description.
981    pub desc: StatementDesc,
982    /// The most recent state revision that has verified this portal.
983    pub state_revision: StateRevision,
984    /// The bound values for the parameters in the prepared statement, if any.
985    pub parameters: Params,
986    /// The desired output format for each column in the result set.
987    pub result_formats: Vec<Format>,
988    /// A handle to metadata needed for statement logging.
989    #[derivative(Debug = "ignore")]
990    pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
991    /// The execution state of the portal.
992    #[derivative(Debug = "ignore")]
993    pub state: PortalState,
994    /// Statement lifecycle timestamps coming from `mz-pgwire`.
995    pub lifecycle_timestamps: Option<LifecycleTimestamps>,
996}
997
998/// A mutable reference to a portal, capturing its state and associated metadata. Importantly, it
999/// does _not_ give _mutable_ access to `stmt` and `desc`, which means that you do not need to
1000/// increment `Session::state_revision` when modifying fields through a `PortalRefMut`, because the
1001/// portal's meaning is not changed.
1002pub struct PortalRefMut<'a> {
1003    /// The statement that is bound to this portal.
1004    pub stmt: &'a Option<Arc<Statement<Raw>>>,
1005    /// The statement description.
1006    pub desc: &'a StatementDesc,
1007    /// The most recent state revision that has verified this portal.
1008    pub state_revision: &'a mut StateRevision,
1009    /// The bound values for the parameters in the prepared statement, if any.
1010    pub parameters: &'a mut Params,
1011    /// The desired output format for each column in the result set.
1012    pub result_formats: &'a mut Vec<Format>,
1013    /// A handle to metadata needed for statement logging.
1014    pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1015    /// The execution state of the portal.
1016    pub state: &'a mut PortalState,
1017    /// Statement lifecycle timestamps coming from `mz-pgwire`.
1018    pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1019}
1020
1021/// Points to a revision of catalog state and session state. When the current revisions are not the
1022/// same as the revisions when a prepared statement or a portal was described, we need to check
1023/// whether the description is still valid.
1024#[derive(Debug, Clone, Copy, PartialEq)]
1025pub struct StateRevision {
1026    /// A revision of the catalog.
1027    pub catalog_revision: u64,
1028    /// A revision of the session state.
1029    pub session_state_revision: u64,
1030}
1031
1032/// Execution states of a portal.
1033pub enum PortalState {
1034    /// Portal not yet started.
1035    NotStarted,
1036    /// Portal is a rows-returning statement in progress with 0 or more rows
1037    /// remaining.
1038    InProgress(Option<InProgressRows>),
1039    /// Portal has completed and should not be re-executed. If the optional string
1040    /// is present, it is returned as a CommandComplete tag, otherwise an error
1041    /// is sent.
1042    Completed(Option<String>),
1043}
1044
1045/// State of an in-progress, rows-returning portal.
1046pub struct InProgressRows {
1047    /// The current batch of rows.
1048    pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1049    /// A stream from which to fetch more row batches.
1050    pub remaining: RecordFirstRowStream,
1051}
1052
1053impl InProgressRows {
1054    /// Creates a new InProgressRows from a batch stream.
1055    pub fn new(remaining: RecordFirstRowStream) -> Self {
1056        Self {
1057            current: None,
1058            remaining,
1059        }
1060    }
1061
1062    /// Determines whether the underlying stream has ended and there are also no more rows
1063    /// stashed in `current`.
1064    pub fn no_more_rows(&self) -> bool {
1065        self.remaining.no_more_rows && self.current.is_none()
1066    }
1067}
1068
1069/// A channel of batched rows.
1070pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1071
1072/// Part of statement lifecycle. These are timestamps that come from the Adapter frontend
1073/// (`mz-pgwire`) part of the lifecycle.
1074#[derive(Debug, Clone)]
1075pub struct LifecycleTimestamps {
1076    /// When the query was received. More specifically, when the tokio recv returned.
1077    /// For a Simple Query, this is for the whole query, for the Extended Query flow, this is only
1078    /// for `FrontendMessage::Execute`. (This means that this is after parsing for the
1079    /// Extended Query flow.)
1080    pub received: EpochMillis,
1081}
1082
1083impl LifecycleTimestamps {
1084    /// Creates a new `LifecycleTimestamps`.
1085    pub fn new(received: EpochMillis) -> Self {
1086        Self { received }
1087    }
1088}
1089
1090/// The transaction status of a session.
1091///
1092/// PostgreSQL's transaction states are in backend/access/transam/xact.c.
1093#[derive(Debug)]
1094pub enum TransactionStatus<T> {
1095    /// Idle. Matches `TBLOCK_DEFAULT`.
1096    Default,
1097    /// Running a single-query transaction. Matches
1098    /// `TBLOCK_STARTED`. In PostgreSQL, when using the extended query protocol, this
1099    /// may be upgraded into multi-statement implicit query (see [`Self::InTransactionImplicit`]).
1100    /// Additionally, some statements may trigger an eager commit of the implicit transaction,
1101    /// see: <https://git.postgresql.org/gitweb/?p=postgresql.git&a=commitdiff&h=f92944137>. In
1102    /// Materialize however, we eagerly commit all statements outside of an explicit transaction
1103    /// when using the extended query protocol. Therefore, we can guarantee that this state will
1104    /// always be a single-query transaction and never be upgraded into a multi-statement implicit
1105    /// query.
1106    Started(Transaction<T>),
1107    /// Currently in a transaction issued from a `BEGIN`. Matches `TBLOCK_INPROGRESS`.
1108    InTransaction(Transaction<T>),
1109    /// Currently in an implicit transaction started from a multi-statement query
1110    /// with more than 1 statements. Matches `TBLOCK_IMPLICIT_INPROGRESS`.
1111    InTransactionImplicit(Transaction<T>),
1112    /// In a failed transaction. Matches `TBLOCK_ABORT`.
1113    Failed(Transaction<T>),
1114}
1115
1116impl<T: TimestampManipulation> TransactionStatus<T> {
1117    /// Extracts the inner transaction ops and write lock guard if not failed.
1118    pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1119        match self {
1120            TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1121            TransactionStatus::Started(txn)
1122            | TransactionStatus::InTransaction(txn)
1123            | TransactionStatus::InTransactionImplicit(txn) => {
1124                (Some(txn.ops), txn.write_lock_guards)
1125            }
1126        }
1127    }
1128
1129    /// Exposes the inner transaction.
1130    pub fn inner(&self) -> Option<&Transaction<T>> {
1131        match self {
1132            TransactionStatus::Default => None,
1133            TransactionStatus::Started(txn)
1134            | TransactionStatus::InTransaction(txn)
1135            | TransactionStatus::InTransactionImplicit(txn)
1136            | TransactionStatus::Failed(txn) => Some(txn),
1137        }
1138    }
1139
1140    /// Exposes the inner transaction.
1141    pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1142        match self {
1143            TransactionStatus::Default => None,
1144            TransactionStatus::Started(txn)
1145            | TransactionStatus::InTransaction(txn)
1146            | TransactionStatus::InTransactionImplicit(txn)
1147            | TransactionStatus::Failed(txn) => Some(txn),
1148        }
1149    }
1150
1151    /// Whether the transaction's ops are DDL.
1152    pub fn is_ddl(&self) -> bool {
1153        match self {
1154            TransactionStatus::Default => false,
1155            TransactionStatus::Started(txn)
1156            | TransactionStatus::InTransaction(txn)
1157            | TransactionStatus::InTransactionImplicit(txn)
1158            | TransactionStatus::Failed(txn) => {
1159                matches!(txn.ops, TransactionOps::DDL { .. })
1160            }
1161        }
1162    }
1163
1164    /// Expresses whether or not the transaction was implicitly started.
1165    /// However, its negation does not imply explicitly started.
1166    pub fn is_implicit(&self) -> bool {
1167        match self {
1168            TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1169            TransactionStatus::Default
1170            | TransactionStatus::InTransaction(_)
1171            | TransactionStatus::Failed(_) => false,
1172        }
1173    }
1174
1175    /// Whether the transaction may contain multiple statements.
1176    pub fn is_in_multi_statement_transaction(&self) -> bool {
1177        match self {
1178            TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1179                true
1180            }
1181            TransactionStatus::Default
1182            | TransactionStatus::Started(_)
1183            | TransactionStatus::Failed(_) => false,
1184        }
1185    }
1186
1187    /// Whether the transaction is in a multi-statement, immediate transaction.
1188    pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1189        self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1190    }
1191
1192    /// Grants the writes lock to the inner transaction, returning an error if the transaction
1193    /// has already been granted write locks.
1194    ///
1195    /// # Panics
1196    /// If `self` is `TransactionStatus::Default`, which indicates that the
1197    /// transaction is idle, which is not appropriate to assign the
1198    /// coordinator's write lock to.
1199    ///
1200    pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1201        match self {
1202            TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1203            TransactionStatus::Started(txn)
1204            | TransactionStatus::InTransaction(txn)
1205            | TransactionStatus::InTransactionImplicit(txn)
1206            | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1207        }
1208    }
1209
1210    /// Returns the currently held [`WriteLocks`], if this transaction holds any.
1211    pub fn write_locks(&self) -> Option<&WriteLocks> {
1212        match self {
1213            TransactionStatus::Default => None,
1214            TransactionStatus::Started(txn)
1215            | TransactionStatus::InTransaction(txn)
1216            | TransactionStatus::InTransactionImplicit(txn)
1217            | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1218        }
1219    }
1220
1221    /// The timeline of the transaction, if one exists.
1222    pub fn timeline(&self) -> Option<Timeline> {
1223        match self {
1224            TransactionStatus::Default => None,
1225            TransactionStatus::Started(txn)
1226            | TransactionStatus::InTransaction(txn)
1227            | TransactionStatus::InTransactionImplicit(txn)
1228            | TransactionStatus::Failed(txn) => txn.timeline(),
1229        }
1230    }
1231
1232    /// The cluster of the transaction, if one exists.
1233    pub fn cluster(&self) -> Option<ClusterId> {
1234        match self {
1235            TransactionStatus::Default => None,
1236            TransactionStatus::Started(txn)
1237            | TransactionStatus::InTransaction(txn)
1238            | TransactionStatus::InTransactionImplicit(txn)
1239            | TransactionStatus::Failed(txn) => txn.cluster(),
1240        }
1241    }
1242
1243    /// Snapshot of the catalog that reflects DDL operations run in this transaction.
1244    pub fn catalog_state(&self) -> Option<&CatalogState> {
1245        match self.inner() {
1246            Some(Transaction {
1247                ops: TransactionOps::DDL { state, .. },
1248                ..
1249            }) => Some(state),
1250            _ => None,
1251        }
1252    }
1253
1254    /// Reports whether any operations have been executed as part of this transaction
1255    pub fn contains_ops(&self) -> bool {
1256        match self.inner() {
1257            Some(txn) => txn.contains_ops(),
1258            None => false,
1259        }
1260    }
1261
1262    /// Checks whether the current state of this transaction allows writes
1263    /// (adding write ops).
1264    /// transaction
1265    pub fn allows_writes(&self) -> bool {
1266        match self {
1267            TransactionStatus::Started(Transaction { ops, access, .. })
1268            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1269            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1270                match ops {
1271                    TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1272                    TransactionOps::Peeks { determination, .. } => {
1273                        // If-and-only-if peeks thus far do not have a timestamp
1274                        // (i.e. they are constant), we can switch to a write
1275                        // transaction.
1276                        !determination.timestamp_context.contains_timestamp()
1277                    }
1278                    TransactionOps::Subscribe => false,
1279                    TransactionOps::Writes(_) => true,
1280                    TransactionOps::SingleStatement { .. } => false,
1281                    TransactionOps::DDL { .. } => false,
1282                }
1283            }
1284            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1285                unreachable!()
1286            }
1287        }
1288    }
1289
1290    /// Adds operations to the current transaction. An error is produced if they cannot be merged
1291    /// (i.e., a timestamp-dependent read cannot be merged to an insert).
1292    ///
1293    /// The `DDL` variant is an exception and does not merge operations, but instead overwrites the
1294    /// old ops with the new ops. This is correct because it is only used in conjunction with the
1295    /// Dry Run catalog op which returns an error containing all of the ops, and those ops are
1296    /// passed to this function which then overwrites.
1297    ///
1298    /// # Panics
1299    /// If the operations are compatible but the operation metadata doesn't match. Such as reads at
1300    /// different timestamps, reads on different timelines, reads on different clusters, etc. It's
1301    /// up to the caller to make sure these are aligned.
1302    pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1303        match self {
1304            TransactionStatus::Started(Transaction { ops, access, .. })
1305            | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1306            | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1307                match ops {
1308                    TransactionOps::None => {
1309                        if matches!(access, Some(TransactionAccessMode::ReadOnly))
1310                            && matches!(add_ops, TransactionOps::Writes(_))
1311                        {
1312                            return Err(AdapterError::ReadOnlyTransaction);
1313                        }
1314                        *ops = add_ops;
1315                    }
1316                    TransactionOps::Peeks {
1317                        determination,
1318                        cluster_id,
1319                        requires_linearization,
1320                    } => match add_ops {
1321                        TransactionOps::Peeks {
1322                            determination: add_timestamp_determination,
1323                            cluster_id: add_cluster_id,
1324                            requires_linearization: add_requires_linearization,
1325                        } => {
1326                            assert_eq!(*cluster_id, add_cluster_id);
1327                            match (
1328                                &determination.timestamp_context,
1329                                &add_timestamp_determination.timestamp_context,
1330                            ) {
1331                                (
1332                                    TimestampContext::TimelineTimestamp {
1333                                        timeline: txn_timeline,
1334                                        chosen_ts: txn_ts,
1335                                        oracle_ts: _,
1336                                    },
1337                                    TimestampContext::TimelineTimestamp {
1338                                        timeline: add_timeline,
1339                                        chosen_ts: add_ts,
1340                                        oracle_ts: _,
1341                                    },
1342                                ) => {
1343                                    assert_eq!(txn_timeline, add_timeline);
1344                                    assert_eq!(txn_ts, add_ts);
1345                                }
1346                                (TimestampContext::NoTimestamp, _) => {
1347                                    *determination = add_timestamp_determination
1348                                }
1349                                (_, TimestampContext::NoTimestamp) => {}
1350                            };
1351                            if matches!(requires_linearization, RequireLinearization::NotRequired)
1352                                && matches!(
1353                                    add_requires_linearization,
1354                                    RequireLinearization::Required
1355                                )
1356                            {
1357                                *requires_linearization = add_requires_linearization;
1358                            }
1359                        }
1360                        // Iff peeks thus far do not have a timestamp (i.e.
1361                        // they are constant), we can switch to a write
1362                        // transaction.
1363                        writes @ TransactionOps::Writes(..)
1364                            if !determination.timestamp_context.contains_timestamp() =>
1365                        {
1366                            *ops = writes;
1367                        }
1368                        _ => return Err(AdapterError::ReadOnlyTransaction),
1369                    },
1370                    TransactionOps::Subscribe => {
1371                        return Err(AdapterError::SubscribeOnlyTransaction);
1372                    }
1373                    TransactionOps::Writes(txn_writes) => match add_ops {
1374                        TransactionOps::Writes(mut add_writes) => {
1375                            // We should have already checked the access above, but make sure we don't miss
1376                            // it anyway.
1377                            assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1378                            txn_writes.append(&mut add_writes);
1379                        }
1380                        // Iff peeks do not have a timestamp (i.e. they are
1381                        // constant), we can permit them.
1382                        TransactionOps::Peeks { determination, .. }
1383                            if !determination.timestamp_context.contains_timestamp() => {}
1384                        _ => {
1385                            return Err(AdapterError::WriteOnlyTransaction);
1386                        }
1387                    },
1388                    TransactionOps::SingleStatement { .. } => {
1389                        return Err(AdapterError::SingleStatementTransaction);
1390                    }
1391                    TransactionOps::DDL {
1392                        ops: og_ops,
1393                        revision: og_revision,
1394                        state: og_state,
1395                        side_effects,
1396                    } => match add_ops {
1397                        TransactionOps::DDL {
1398                            ops: new_ops,
1399                            revision: new_revision,
1400                            side_effects: mut net_new_side_effects,
1401                            state: new_state,
1402                        } => {
1403                            if *og_revision != new_revision {
1404                                return Err(AdapterError::DDLTransactionRace);
1405                            }
1406                            // The old og_ops are overwritten, not extended.
1407                            if !new_ops.is_empty() {
1408                                *og_ops = new_ops;
1409                                *og_state = new_state;
1410                            }
1411                            side_effects.append(&mut net_new_side_effects);
1412                        }
1413                        _ => return Err(AdapterError::DDLOnlyTransaction),
1414                    },
1415                }
1416            }
1417            TransactionStatus::Default | TransactionStatus::Failed(_) => {
1418                unreachable!()
1419            }
1420        }
1421        Ok(())
1422    }
1423}
1424
1425/// An abstraction allowing us to identify different transactions.
1426pub type TransactionId = u64;
1427
1428impl<T> Default for TransactionStatus<T> {
1429    fn default() -> Self {
1430        TransactionStatus::Default
1431    }
1432}
1433
1434/// State data for transactions.
1435#[derive(Debug)]
1436pub struct Transaction<T> {
1437    /// Plan context.
1438    pub pcx: PlanContext,
1439    /// Transaction operations.
1440    pub ops: TransactionOps<T>,
1441    /// Uniquely identifies the transaction on a per connection basis.
1442    /// Two transactions started from separate connections may share the
1443    /// same ID.
1444    /// If all IDs have been exhausted, this will wrap around back to 0.
1445    pub id: TransactionId,
1446    /// Locks for objects this transaction will operate on.
1447    write_lock_guards: Option<WriteLocks>,
1448    /// Access mode (read only, read write).
1449    access: Option<TransactionAccessMode>,
1450}
1451
1452impl<T> Transaction<T> {
1453    /// Tries to grant the write lock to this transaction for the remainder of its lifetime. Errors
1454    /// if this [`Transaction`] has already been granted write locks.
1455    fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1456        match &mut self.write_lock_guards {
1457            Some(existing) => Err(existing),
1458            locks @ None => {
1459                *locks = Some(guards);
1460                Ok(())
1461            }
1462        }
1463    }
1464
1465    /// The timeline of the transaction, if one exists.
1466    fn timeline(&self) -> Option<Timeline> {
1467        match &self.ops {
1468            TransactionOps::Peeks {
1469                determination:
1470                    TimestampDetermination {
1471                        timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1472                        ..
1473                    },
1474                ..
1475            } => Some(timeline.clone()),
1476            TransactionOps::Peeks { .. }
1477            | TransactionOps::None
1478            | TransactionOps::Subscribe
1479            | TransactionOps::Writes(_)
1480            | TransactionOps::SingleStatement { .. }
1481            | TransactionOps::DDL { .. } => None,
1482        }
1483    }
1484
1485    /// The cluster of the transaction, if one exists.
1486    pub fn cluster(&self) -> Option<ClusterId> {
1487        match &self.ops {
1488            TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1489            TransactionOps::None
1490            | TransactionOps::Subscribe
1491            | TransactionOps::Writes(_)
1492            | TransactionOps::SingleStatement { .. }
1493            | TransactionOps::DDL { .. } => None,
1494        }
1495    }
1496
1497    /// Reports whether any operations have been executed as part of this transaction
1498    fn contains_ops(&self) -> bool {
1499        !matches!(self.ops, TransactionOps::None)
1500    }
1501}
1502
1503/// A transaction's status code.
1504#[derive(Debug, Clone, Copy)]
1505pub enum TransactionCode {
1506    /// Not currently in a transaction
1507    Idle,
1508    /// Currently in a transaction
1509    InTransaction,
1510    /// Currently in a transaction block which is failed
1511    Failed,
1512}
1513
1514impl From<TransactionCode> for u8 {
1515    fn from(code: TransactionCode) -> Self {
1516        match code {
1517            TransactionCode::Idle => b'I',
1518            TransactionCode::InTransaction => b'T',
1519            TransactionCode::Failed => b'E',
1520        }
1521    }
1522}
1523
1524impl From<TransactionCode> for String {
1525    fn from(code: TransactionCode) -> Self {
1526        char::from(u8::from(code)).to_string()
1527    }
1528}
1529
1530impl<T> From<&TransactionStatus<T>> for TransactionCode {
1531    /// Convert from the Session's version
1532    fn from(status: &TransactionStatus<T>) -> TransactionCode {
1533        match status {
1534            TransactionStatus::Default => TransactionCode::Idle,
1535            TransactionStatus::Started(_) => TransactionCode::InTransaction,
1536            TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1537            TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1538            TransactionStatus::Failed(_) => TransactionCode::Failed,
1539        }
1540    }
1541}
1542
1543/// The type of operation being performed by the transaction.
1544///
1545/// This is needed because we currently do not allow mixing reads and writes in
1546/// a transaction. Use this to record what we have done, and what may need to
1547/// happen at commit.
1548#[derive(Derivative)]
1549#[derivative(Debug)]
1550pub enum TransactionOps<T> {
1551    /// The transaction has been initiated, but no statement has yet been executed
1552    /// in it.
1553    None,
1554    /// This transaction has had a peek (`SELECT`, `SUBSCRIBE`). If the inner value
1555    /// is has a timestamp, it must only do other peeks. However, if it doesn't
1556    /// have a timestamp (i.e. the values are constants), the transaction can still
1557    /// perform writes.
1558    Peeks {
1559        /// The timestamp and timestamp related metadata for the peek.
1560        determination: TimestampDetermination<T>,
1561        /// The cluster used to execute peeks.
1562        cluster_id: ClusterId,
1563        /// Whether this peek needs to be linearized.
1564        requires_linearization: RequireLinearization,
1565    },
1566    /// This transaction has done a `SUBSCRIBE` and must do nothing else.
1567    Subscribe,
1568    /// This transaction has had a write (`INSERT`, `UPDATE`, `DELETE`) and must
1569    /// only do other writes, or reads whose timestamp is None (i.e. constants).
1570    Writes(Vec<WriteOp>),
1571    /// This transaction has a prospective statement that will execute during commit.
1572    SingleStatement {
1573        /// The prospective statement.
1574        stmt: Arc<Statement<Raw>>,
1575        /// The statement params.
1576        params: mz_sql::plan::Params,
1577    },
1578    /// This transaction has run some _simple_ DDL and must do nothing else. Any statement/plan that
1579    /// uses this must return false in `must_serialize_ddl()` because this is serialized instead in
1580    /// `sequence_plan()` during `COMMIT`.
1581    DDL {
1582        /// Catalog operations that have already run, and must run before each subsequent op.
1583        ops: Vec<crate::catalog::Op>,
1584        /// In-memory state that reflects the previously applied ops.
1585        state: CatalogState,
1586        /// A list of side effects that should be executed if this DDL transaction commits.
1587        #[derivative(Debug = "ignore")]
1588        side_effects: Vec<
1589            Box<
1590                dyn for<'a> FnOnce(
1591                        &'a mut Coordinator,
1592                        Option<&'a mut ExecuteContext>,
1593                    ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1594                    + Send
1595                    + Sync,
1596            >,
1597        >,
1598        /// Transient revision of the `Catalog` when this transaction started.
1599        revision: u64,
1600    },
1601}
1602
1603impl<T> TransactionOps<T> {
1604    fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1605        match self {
1606            TransactionOps::Peeks { determination, .. } => Some(determination),
1607            TransactionOps::None
1608            | TransactionOps::Subscribe
1609            | TransactionOps::Writes(_)
1610            | TransactionOps::SingleStatement { .. }
1611            | TransactionOps::DDL { .. } => None,
1612        }
1613    }
1614}
1615
1616impl<T> Default for TransactionOps<T> {
1617    fn default() -> Self {
1618        Self::None
1619    }
1620}
1621
1622/// An `INSERT` waiting to be committed.
1623#[derive(Debug, Clone, PartialEq)]
1624pub struct WriteOp {
1625    /// The target table.
1626    pub id: CatalogItemId,
1627    /// The data rows.
1628    pub rows: TableData,
1629}
1630
1631/// Whether a transaction requires linearization.
1632#[derive(Debug)]
1633pub enum RequireLinearization {
1634    /// Linearization is required.
1635    Required,
1636    /// Linearization is not required.
1637    NotRequired,
1638}
1639
1640impl From<&ExplainContext> for RequireLinearization {
1641    fn from(ctx: &ExplainContext) -> Self {
1642        match ctx {
1643            ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1644                RequireLinearization::Required
1645            }
1646            _ => RequireLinearization::NotRequired,
1647        }
1648    }
1649}
1650
1651/// A complete set of exclusive locks for writing to collections identified by [`CatalogItemId`]s.
1652///
1653/// To prevent deadlocks between two sessions, we do not allow acquiring a partial set of locks.
1654#[derive(Debug)]
1655pub struct WriteLocks {
1656    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1657    /// Connection that currently holds these locks, used for tracing purposes only.
1658    conn_id: ConnectionId,
1659}
1660
1661impl WriteLocks {
1662    /// Create a [`WriteLocksBuilder`] pre-defining all of the locks we need.
1663    ///
1664    /// When "finishing" the builder with [`WriteLocksBuilder::all_or_nothing`], if we haven't
1665    /// acquired all of the necessary locks we drop any partially acquired ones.
1666    pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1667        let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1668        WriteLocksBuilder { locks }
1669    }
1670
1671    /// Validate this set of [`WriteLocks`] is sufficient for the provided collections.
1672    /// Dropping the currently held locks if it's not.
1673    pub fn validate(
1674        self,
1675        collections: impl Iterator<Item = CatalogItemId>,
1676    ) -> Result<Self, BTreeSet<CatalogItemId>> {
1677        let mut missing = BTreeSet::new();
1678        for collection in collections {
1679            if !self.locks.contains_key(&collection) {
1680                missing.insert(collection);
1681            }
1682        }
1683
1684        if missing.is_empty() {
1685            Ok(self)
1686        } else {
1687            // Explicitly drop the already acquired locks.
1688            drop(self);
1689            Err(missing)
1690        }
1691    }
1692}
1693
1694impl Drop for WriteLocks {
1695    fn drop(&mut self) {
1696        // We may have merged the locks into GroupCommitWriteLocks, thus it could be empty.
1697        if !self.locks.is_empty() {
1698            tracing::info!(
1699                conn_id = %self.conn_id,
1700                locks = ?self.locks,
1701                "dropping write locks",
1702            );
1703        }
1704    }
1705}
1706
1707/// A builder struct that helps us acquire all of the locks we need, or none of them.
1708///
1709/// See [`WriteLocks::builder`].
1710#[derive(Debug)]
1711pub struct WriteLocksBuilder {
1712    locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1713}
1714
1715impl WriteLocksBuilder {
1716    /// Adds a lock to this builder.
1717    pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1718        self.locks.insert(id, Some(lock));
1719    }
1720
1721    /// Finish this builder by returning either all of the necessary locks, or none of them.
1722    ///
1723    /// If we fail to acquire all of the locks, returns one of the [`CatalogItemId`]s that we
1724    /// failed to acquire a lock for, that should be awaited so we know when to run again.
1725    pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1726        let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1727            self.locks
1728                .into_iter()
1729                .partition_map(|(gid, lock)| match lock {
1730                    Some(lock) => itertools::Either::Left((gid, lock)),
1731                    None => itertools::Either::Right(gid),
1732                });
1733
1734        match missing.iter().next() {
1735            None => {
1736                tracing::info!(%conn_id, ?locks, "acquired write locks");
1737                Ok(WriteLocks {
1738                    locks,
1739                    conn_id: conn_id.clone(),
1740                })
1741            }
1742            Some(gid) => {
1743                tracing::info!(?missing, "failed to acquire write locks");
1744                // Explicitly drop the already acquired locks.
1745                drop(locks);
1746                Err(*gid)
1747            }
1748        }
1749    }
1750}
1751
1752/// Collection of [`WriteLocks`] gathered during [`group_commit`].
1753///
1754/// Note: This struct should __never__ be used outside of group commit because it attempts to merge
1755/// together several collections of [`WriteLocks`] which if not done carefully can cause deadlocks
1756/// or consistency violations.
1757///
1758/// We must prevent writes from occurring to tables during read then write plans (e.g. `UPDATE`)
1759/// but we can allow blind writes (e.g. `INSERT`) to get committed concurrently at the same
1760/// timestamp when submitting the updates from a read then write plan.
1761///
1762/// Naively it would seem as though we could allow blind writes to occur whenever as blind writes
1763/// could never cause invalid retractions, but it could cause us to violate serializability because
1764/// there is no total order we could define for the transactions. Consider the following scenario:
1765///
1766/// ```text
1767/// table: foo
1768///
1769///  a | b
1770/// --------
1771///  x   2
1772///  y   3
1773///  z   4
1774///
1775/// -- Session(A)
1776/// -- read then write plan, reads at t0, writes at t3, transaction Ta
1777/// DELETE FROM foo WHERE b % 2 = 0;
1778///
1779///
1780/// -- Session(B)
1781/// -- blind write into foo, writes at t1, transaction Tb
1782/// INSERT INTO foo VALUES ('q', 6);
1783/// -- select from foo, reads at t2, transaction Tc
1784/// SELECT * FROM foo;
1785///
1786///
1787/// The times these operations occur at are ordered:
1788/// t0 < t1 < t2 < t3
1789///
1790/// Given the timing of the operations, the transactions must have the following order:
1791///
1792/// * Ta does not observe ('q', 6), so Ta < Tb
1793/// * Tc does observe ('q', 6), so Tb < Tc
1794/// * Tc does not observe the retractions from Ta, so Tc < Ta
1795///
1796/// For total order to exist, Ta < Tb < Tc < Ta, which is impossible.
1797/// ```
1798///
1799/// [`group_commit`]: super::coord::Coordinator::group_commit
1800#[derive(Debug, Default)]
1801pub(crate) struct GroupCommitWriteLocks {
1802    locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1803}
1804
1805impl GroupCommitWriteLocks {
1806    /// Merge a set of [`WriteLocks`] into this collection for group commit.
1807    pub fn merge(&mut self, mut locks: WriteLocks) {
1808        // Note: Ideally we would use `.drain`, but that method doesn't exist for BTreeMap.
1809        //
1810        // See: <https://github.com/rust-lang/rust/issues/81074>
1811        let existing = std::mem::take(&mut locks.locks);
1812        self.locks.extend(existing);
1813    }
1814
1815    /// Returns the collections we're missing locks for, if any.
1816    pub fn missing_locks(
1817        &self,
1818        writes: impl Iterator<Item = CatalogItemId>,
1819    ) -> BTreeSet<CatalogItemId> {
1820        let mut missing = BTreeSet::new();
1821        for write in writes {
1822            if !self.locks.contains_key(&write) {
1823                missing.insert(write);
1824            }
1825        }
1826        missing
1827    }
1828}
1829
1830impl Drop for GroupCommitWriteLocks {
1831    fn drop(&mut self) {
1832        if !self.locks.is_empty() {
1833            tracing::info!(
1834                locks = ?self.locks,
1835                "dropping group commit write locks",
1836            );
1837        }
1838    }
1839}