1#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::pin::Pin;
21use std::sync::Arc;
22
23use chrono::{DateTime, Utc};
24use derivative::Derivative;
25use itertools::Itertools;
26use mz_adapter_types::connection::ConnectionId;
27use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
28use mz_controller_types::ClusterId;
29use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
30use mz_ore::now::{EpochMillis, NowFn};
31use mz_pgwire_common::Format;
32use mz_repr::role_id::RoleId;
33use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
34use mz_repr::{CatalogItemId, Datum, Row, RowIterator, SqlScalarType, TimestampManipulation};
35use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
36use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
37use mz_sql::session::metadata::SessionMetadata;
38use mz_sql::session::user::{
39 INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
40};
41use mz_sql::session::vars::IsolationLevel;
42pub use mz_sql::session::vars::{
43 DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
44 SERVER_PATCH_VERSION, SessionVars, Var,
45};
46use mz_sql_parser::ast::TransactionIsolationLevel;
47use mz_storage_client::client::TableData;
48use mz_storage_types::sources::Timeline;
49use qcell::{QCell, QCellOwner};
50use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
51use tokio::sync::watch;
52use uuid::Uuid;
53
54use crate::catalog::CatalogState;
55use crate::client::RecordFirstRowStream;
56use crate::coord::appends::BuiltinTableAppendNotify;
57use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
58use crate::coord::peek::PeekResponseUnary;
59use crate::coord::statement_logging::PreparedStatementLoggingInfo;
60use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
61use crate::coord::{Coordinator, ExplainContext};
62use crate::error::AdapterError;
63use crate::metrics::{Metrics, SessionMetrics};
64use crate::{AdapterNotice, ExecuteContext};
65
66const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
67
68#[derive(Derivative)]
70#[derivative(Debug)]
71pub struct Session<T = mz_repr::Timestamp>
72where
73 T: Debug + Clone + Send + Sync,
74{
75 conn_id: ConnectionId,
76 uuid: Uuid,
79 prepared_statements: BTreeMap<String, PreparedStatement>,
80 portals: BTreeMap<String, Portal>,
81 transaction: TransactionStatus<T>,
82 pcx: Option<PlanContext>,
83 metrics: SessionMetrics,
84 #[derivative(Debug = "ignore")]
85 builtin_updates: Option<BuiltinTableAppendNotify>,
86
87 role_metadata: Option<RoleMetadata>,
100 client_ip: Option<IpAddr>,
101 vars: SessionVars,
102 notices_tx: mpsc::UnboundedSender<AdapterNotice>,
103 notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
104 next_transaction_id: TransactionId,
105 secret_key: u32,
106 external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
107 #[derivative(Debug = "ignore")]
119 qcell_owner: QCellOwner,
120 session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
121 state_revision: u64,
127}
128
129impl<T> SessionMetadata for Session<T>
130where
131 T: Debug + Clone + Send + Sync,
132 T: TimestampManipulation,
133{
134 fn conn_id(&self) -> &ConnectionId {
135 &self.conn_id
136 }
137
138 fn client_ip(&self) -> Option<&IpAddr> {
139 self.client_ip.as_ref()
140 }
141
142 fn pcx(&self) -> &PlanContext {
143 &self
144 .transaction()
145 .inner()
146 .expect("no active transaction")
147 .pcx
148 }
149
150 fn role_metadata(&self) -> &RoleMetadata {
151 self.role_metadata
152 .as_ref()
153 .expect("role_metadata invariant violated")
154 }
155
156 fn vars(&self) -> &SessionVars {
157 &self.vars
158 }
159}
160
161#[derive(Debug)]
164pub struct SessionMeta {
165 conn_id: ConnectionId,
166 client_ip: Option<IpAddr>,
167 pcx: PlanContext,
168 role_metadata: RoleMetadata,
169 vars: SessionVars,
170}
171
172impl SessionMetadata for SessionMeta {
173 fn vars(&self) -> &SessionVars {
174 &self.vars
175 }
176
177 fn conn_id(&self) -> &ConnectionId {
178 &self.conn_id
179 }
180
181 fn client_ip(&self) -> Option<&IpAddr> {
182 self.client_ip.as_ref()
183 }
184
185 fn pcx(&self) -> &PlanContext {
186 &self.pcx
187 }
188
189 fn role_metadata(&self) -> &RoleMetadata {
190 &self.role_metadata
191 }
192}
193
194#[derive(Debug, Clone)]
196pub struct SessionConfig {
197 pub conn_id: ConnectionId,
201 pub uuid: Uuid,
206 pub client_ip: Option<IpAddr>,
208 pub user: String,
210 pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
213 pub internal_user_metadata: Option<InternalUserMetadata>,
215 pub helm_chart_version: Option<String>,
217}
218
219impl<T: TimestampManipulation> Session<T> {
220 pub(crate) fn new(
222 build_info: &'static BuildInfo,
223 config: SessionConfig,
224 metrics: SessionMetrics,
225 ) -> Session<T> {
226 assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
227 Self::new_internal(build_info, config, metrics)
228 }
229
230 pub fn meta(&self) -> SessionMeta {
233 SessionMeta {
234 conn_id: self.conn_id().clone(),
235 client_ip: self.client_ip().copied(),
236 pcx: self.pcx().clone(),
237 role_metadata: self.role_metadata().clone(),
238 vars: self.vars.clone(),
239 }
240
241 }
243
244 pub(crate) fn mint_logging<A: AstInfo>(
254 &self,
255 raw_sql: String,
256 stmt: Option<&Statement<A>>,
257 now: EpochMillis,
258 ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
259 Arc::new(QCell::new(
260 &self.qcell_owner,
261 PreparedStatementLoggingInfo::still_to_log(
262 raw_sql,
263 stmt,
264 now,
265 "".to_string(),
266 self.uuid,
267 false,
268 ),
269 ))
270 }
271
272 pub(crate) fn qcell_ro<'a, T2: 'a>(&'a self, cell: &'a Arc<QCell<T2>>) -> &'a T2 {
273 self.qcell_owner.ro(&*cell)
274 }
275
276 pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
277 self.qcell_owner.rw(&*cell)
278 }
279
280 pub fn uuid(&self) -> Uuid {
283 self.uuid
284 }
285
286 pub fn dummy() -> Session<T> {
291 let registry = MetricsRegistry::new();
292 let metrics = Metrics::register_into(®istry);
293 let metrics = metrics.session_metrics();
294 let mut dummy = Self::new_internal(
295 &DUMMY_BUILD_INFO,
296 SessionConfig {
297 conn_id: DUMMY_CONNECTION_ID,
298 uuid: Uuid::new_v4(),
299 user: SYSTEM_USER.name.clone(),
300 client_ip: None,
301 external_metadata_rx: None,
302 internal_user_metadata: None,
303 helm_chart_version: None,
304 },
305 metrics,
306 );
307 dummy.initialize_role_metadata(RoleId::User(0));
308 dummy
309 }
310
311 fn new_internal(
312 build_info: &'static BuildInfo,
313 SessionConfig {
314 conn_id,
315 uuid,
316 user,
317 client_ip,
318 mut external_metadata_rx,
319 internal_user_metadata,
320 helm_chart_version,
321 }: SessionConfig,
322 metrics: SessionMetrics,
323 ) -> Session<T> {
324 let (notices_tx, notices_rx) = mpsc::unbounded_channel();
325 let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
326 let user = User {
327 name: user,
328 internal_metadata: internal_user_metadata,
329 external_metadata: external_metadata_rx
330 .as_mut()
331 .map(|rx| rx.borrow_and_update().clone()),
332 };
333 let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
334 if let Some(default_cluster) = default_cluster {
335 vars.set_cluster(default_cluster.clone());
336 }
337 Session {
338 conn_id,
339 uuid,
340 transaction: TransactionStatus::Default,
341 pcx: None,
342 metrics,
343 builtin_updates: None,
344 prepared_statements: BTreeMap::new(),
345 portals: BTreeMap::new(),
346 role_metadata: None,
347 client_ip,
348 vars,
349 notices_tx,
350 notices_rx,
351 next_transaction_id: 0,
352 secret_key: rand::random(),
353 external_metadata_rx,
354 qcell_owner: QCellOwner::new(),
355 session_oracles: BTreeMap::new(),
356 state_revision: 0,
357 }
358 }
359
360 pub fn secret_key(&self) -> u32 {
362 self.secret_key
363 }
364
365 fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
366 if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
367 wall_time = *mock_time;
368 }
369 PlanContext::new(wall_time)
370 }
371
372 pub fn start_transaction(
375 &mut self,
376 wall_time: DateTime<Utc>,
377 access: Option<TransactionAccessMode>,
378 isolation_level: Option<TransactionIsolationLevel>,
379 ) -> Result<(), AdapterError> {
380 if let Some(txn) = self.transaction.inner() {
382 let read_write_prohibited = match txn.ops {
386 TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
387 txn.access == Some(TransactionAccessMode::ReadOnly)
388 }
389 TransactionOps::None
390 | TransactionOps::Writes(_)
391 | TransactionOps::SingleStatement { .. }
392 | TransactionOps::DDL { .. } => false,
393 };
394
395 if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
396 return Err(AdapterError::ReadWriteUnavailable);
397 }
398 }
399
400 match std::mem::take(&mut self.transaction) {
401 TransactionStatus::Default => {
402 let id = self.next_transaction_id;
403 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
404 self.transaction = TransactionStatus::InTransaction(Transaction {
405 pcx: self.new_pcx(wall_time),
406 ops: TransactionOps::None,
407 write_lock_guards: None,
408 access,
409 id,
410 });
411 }
412 TransactionStatus::Started(mut txn)
413 | TransactionStatus::InTransactionImplicit(mut txn)
414 | TransactionStatus::InTransaction(mut txn) => {
415 if access.is_some() {
416 txn.access = access;
417 }
418 self.transaction = TransactionStatus::InTransaction(txn);
419 }
420 TransactionStatus::Failed(_) => unreachable!(),
421 };
422
423 if let Some(isolation_level) = isolation_level {
424 self.vars
425 .set_local_transaction_isolation(isolation_level.into());
426 }
427
428 Ok(())
429 }
430
431 pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
434 if let TransactionStatus::Default = self.transaction {
435 let id = self.next_transaction_id;
436 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
437 let txn = Transaction {
438 pcx: self.new_pcx(wall_time),
439 ops: TransactionOps::None,
440 write_lock_guards: None,
441 access: None,
442 id,
443 };
444 match stmts {
445 1 => self.transaction = TransactionStatus::Started(txn),
446 n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
447 _ => {}
448 }
449 }
450 }
451
452 pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
454 self.start_transaction_implicit(wall_time, 1);
455 }
456
457 #[must_use]
467 pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
468 self.portals.clear();
469 self.pcx = None;
470 self.state_revision += 1;
471 mem::take(&mut self.transaction)
472 }
473
474 pub fn fail_transaction(mut self) -> Self {
476 match self.transaction {
477 TransactionStatus::Default => unreachable!(),
478 TransactionStatus::Started(txn)
479 | TransactionStatus::InTransactionImplicit(txn)
480 | TransactionStatus::InTransaction(txn) => {
481 self.transaction = TransactionStatus::Failed(txn);
482 }
483 TransactionStatus::Failed(_) => {}
484 };
485 self
486 }
487
488 pub fn transaction(&self) -> &TransactionStatus<T> {
490 &self.transaction
491 }
492
493 pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
495 &mut self.transaction
496 }
497
498 pub fn transaction_code(&self) -> TransactionCode {
500 self.transaction().into()
501 }
502
503 pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
507 self.transaction.add_ops(add_ops)
508 }
509
510 pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
512 self.notices_tx.clone()
513 }
514
515 pub fn add_notice(&self, notice: AdapterNotice) {
517 self.add_notices([notice])
518 }
519
520 pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
522 for notice in notices {
523 let _ = self.notices_tx.send(notice);
524 }
525 }
526
527 pub async fn recv_notice(&mut self) -> AdapterNotice {
531 loop {
533 let notice = self
534 .notices_rx
535 .recv()
536 .await
537 .expect("Session also holds a sender, so recv won't ever return None");
538 match self.notice_filter(notice) {
539 Some(notice) => return notice,
540 None => continue,
541 }
542 }
543 }
544
545 pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
547 let mut notices = Vec::new();
548 while let Ok(notice) = self.notices_rx.try_recv() {
549 if let Some(notice) = self.notice_filter(notice) {
550 notices.push(notice);
551 }
552 }
553 notices
554 }
555
556 fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
558 let minimum_client_severity = self.vars.client_min_messages();
560 let sev = notice.severity();
561 if !minimum_client_severity.should_output_to_client(&sev) {
562 return None;
563 }
564 if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = ¬ice {
566 if cluster != self.vars.cluster() {
567 return None;
568 }
569 }
570 Some(notice)
571 }
572
573 pub fn clear_transaction_ops(&mut self) {
576 if let Some(txn) = self.transaction.inner_mut() {
577 txn.ops = TransactionOps::None;
578 }
579 }
580
581 pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
586 if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
587 if let TransactionOps::Peeks { .. } = ops {
588 let ops = std::mem::take(ops);
589 Some(
590 ops.timestamp_determination()
591 .expect("checked above")
592 .timestamp_context,
593 )
594 } else {
595 None
596 }
597 } else {
598 None
599 }
600 }
601
602 pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
607 match self.transaction.inner() {
608 Some(Transaction {
609 pcx: _,
610 ops: TransactionOps::Peeks { determination, .. },
611 write_lock_guards: _,
612 access: _,
613 id: _,
614 }) => Some(determination.clone()),
615 _ => None,
616 }
617 }
618
619 pub fn contains_read_timestamp(&self) -> bool {
621 matches!(
622 self.transaction.inner(),
623 Some(Transaction {
624 pcx: _,
625 ops: TransactionOps::Peeks {
626 determination: TimestampDetermination {
627 timestamp_context: TimestampContext::TimelineTimestamp { .. },
628 ..
629 },
630 ..
631 },
632 write_lock_guards: _,
633 access: _,
634 id: _,
635 })
636 )
637 }
638
639 pub fn set_prepared_statement(
641 &mut self,
642 name: String,
643 stmt: Option<Statement<Raw>>,
644 raw_sql: String,
645 desc: StatementDesc,
646 state_revision: StateRevision,
647 now: EpochMillis,
648 ) {
649 let logging = PreparedStatementLoggingInfo::still_to_log(
650 raw_sql,
651 stmt.as_ref(),
652 now,
653 name.clone(),
654 self.uuid,
655 false,
656 );
657 let statement = PreparedStatement {
658 stmt,
659 desc,
660 state_revision,
661 logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
662 };
663 self.prepared_statements.insert(name, statement);
664 }
665
666 pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
670 self.prepared_statements.remove(name).is_some()
671 }
672
673 pub fn remove_all_prepared_statements(&mut self) {
675 self.prepared_statements.clear();
676 }
677
678 pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
683 self.prepared_statements.get(name)
684 }
685
686 pub fn get_prepared_statement_mut_unverified(
691 &mut self,
692 name: &str,
693 ) -> Option<&mut PreparedStatement> {
694 self.prepared_statements.get_mut(name)
695 }
696
697 pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
699 &self.prepared_statements
700 }
701
702 pub fn portals(&self) -> &BTreeMap<String, Portal> {
704 &self.portals
705 }
706
707 pub fn set_portal(
717 &mut self,
718 portal_name: String,
719 desc: StatementDesc,
720 stmt: Option<Statement<Raw>>,
721 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
722 params: Vec<(Datum, SqlScalarType)>,
723 result_formats: Vec<Format>,
724 state_revision: StateRevision,
725 ) -> Result<(), AdapterError> {
726 if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
728 return Err(AdapterError::DuplicateCursor(portal_name));
729 }
730 self.state_revision += 1;
731 let param_types = desc.param_types.clone();
732 self.portals.insert(
733 portal_name,
734 Portal {
735 stmt: stmt.map(Arc::new),
736 desc,
737 state_revision,
738 parameters: Params {
739 datums: Row::pack(params.iter().map(|(d, _t)| d)),
740 execute_types: params.into_iter().map(|(_d, t)| t).collect(),
741 expected_types: param_types,
742 },
743 result_formats,
744 state: PortalState::NotStarted,
745 logging,
746 lifecycle_timestamps: None,
747 },
748 );
749 Ok(())
750 }
751
752 pub fn remove_portal(&mut self, portal_name: &str) -> bool {
756 self.state_revision += 1;
757 self.portals.remove(portal_name).is_some()
758 }
759
760 pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
764 self.portals.get(portal_name)
765 }
766
767 pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<PortalRefMut<'_>> {
774 self.portals.get_mut(portal_name).map(|p| PortalRefMut {
775 stmt: &p.stmt,
776 desc: &p.desc,
777 state_revision: &mut p.state_revision,
778 parameters: &mut p.parameters,
779 result_formats: &mut p.result_formats,
780 logging: &mut p.logging,
781 state: &mut p.state,
782 lifecycle_timestamps: &mut p.lifecycle_timestamps,
783 })
784 }
785
786 pub fn create_new_portal(
788 &mut self,
789 stmt: Option<Statement<Raw>>,
790 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
791 desc: StatementDesc,
792 parameters: Params,
793 result_formats: Vec<Format>,
794 state_revision: StateRevision,
795 ) -> Result<String, AdapterError> {
796 self.state_revision += 1;
797
798 for i in 0usize.. {
800 let name = format!("<unnamed portal {}>", i);
801 match self.portals.entry(name.clone()) {
802 Entry::Occupied(_) => continue,
803 Entry::Vacant(entry) => {
804 entry.insert(Portal {
805 stmt: stmt.map(Arc::new),
806 desc,
807 state_revision,
808 parameters,
809 result_formats,
810 state: PortalState::NotStarted,
811 logging,
812 lifecycle_timestamps: None,
813 });
814 return Ok(name);
815 }
816 }
817 }
818
819 coord_bail!("unable to create a new portal");
820 }
821
822 pub fn reset(&mut self) {
825 let _ = self.clear_transaction();
826 self.prepared_statements.clear();
827 self.vars.reset_all();
828 }
829
830 pub fn application_name(&self) -> &str {
834 self.vars.application_name()
835 }
836
837 pub fn vars(&self) -> &SessionVars {
839 &self.vars
840 }
841
842 pub fn vars_mut(&mut self) -> &mut SessionVars {
844 &mut self.vars
845 }
846
847 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
853 self.transaction.try_grant_write_locks(guards)
854 }
855
856 pub fn apply_external_metadata_updates(&mut self) {
858 let Some(rx) = &mut self.external_metadata_rx else {
860 return;
861 };
862
863 if !rx.has_changed().unwrap_or(false) {
865 return;
866 }
867
868 let metadata = rx.borrow_and_update().clone();
871 self.vars.set_external_user_metadata(metadata);
872 }
873
874 pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
876 self.role_metadata = Some(RoleMetadata::new(role_id));
877 }
878
879 pub fn ensure_timestamp_oracle(
882 &mut self,
883 timeline: Timeline,
884 ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
885 self.session_oracles
886 .entry(timeline)
887 .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
888 }
889
890 pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
893 self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
894 }
895
896 pub fn get_timestamp_oracle(
898 &self,
899 timeline: &Timeline,
900 ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
901 self.session_oracles.get(timeline)
902 }
903
904 pub fn apply_write(&mut self, timestamp: T) {
907 if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
908 self.ensure_local_timestamp_oracle().apply_write(timestamp);
909 }
910 }
911
912 pub fn metrics(&self) -> &SessionMetrics {
914 &self.metrics
915 }
916
917 pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
919 let prev = self.builtin_updates.replace(fut);
920 mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
921 }
922
923 pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
926 if let Some(fut) = self.builtin_updates.take() {
927 let histogram = self
929 .metrics()
930 .session_startup_table_writes_seconds()
931 .clone();
932 Some(async move {
933 fut.wall_time().observe(histogram).await;
934 })
935 } else {
936 None
937 }
938 }
939
940 pub fn state_revision(&self) -> u64 {
943 self.state_revision
944 }
945}
946
947#[derive(Derivative, Clone)]
949#[derivative(Debug)]
950pub struct PreparedStatement {
951 stmt: Option<Statement<Raw>>,
952 desc: StatementDesc,
953 pub state_revision: StateRevision,
955 #[derivative(Debug = "ignore")]
956 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
957}
958
959impl PreparedStatement {
960 pub fn stmt(&self) -> Option<&Statement<Raw>> {
963 self.stmt.as_ref()
964 }
965
966 pub fn desc(&self) -> &StatementDesc {
968 &self.desc
969 }
970
971 pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
973 &self.logging
974 }
975}
976
977#[derive(Derivative)]
979#[derivative(Debug)]
980pub struct Portal {
981 pub stmt: Option<Arc<Statement<Raw>>>,
983 pub desc: StatementDesc,
985 pub state_revision: StateRevision,
987 pub parameters: Params,
989 pub result_formats: Vec<Format>,
991 #[derivative(Debug = "ignore")]
993 pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
994 #[derivative(Debug = "ignore")]
996 pub state: PortalState,
997 pub lifecycle_timestamps: Option<LifecycleTimestamps>,
999}
1000
1001pub struct PortalRefMut<'a> {
1006 pub stmt: &'a Option<Arc<Statement<Raw>>>,
1008 pub desc: &'a StatementDesc,
1010 pub state_revision: &'a mut StateRevision,
1012 pub parameters: &'a mut Params,
1014 pub result_formats: &'a mut Vec<Format>,
1016 pub logging: &'a mut Arc<QCell<PreparedStatementLoggingInfo>>,
1018 pub state: &'a mut PortalState,
1020 pub lifecycle_timestamps: &'a mut Option<LifecycleTimestamps>,
1022}
1023
1024#[derive(Debug, Clone, Copy, PartialEq)]
1028pub struct StateRevision {
1029 pub catalog_revision: u64,
1031 pub session_state_revision: u64,
1033}
1034
1035pub enum PortalState {
1037 NotStarted,
1039 InProgress(Option<InProgressRows>),
1042 Completed(Option<String>),
1046}
1047
1048pub struct InProgressRows {
1050 pub current: Option<Box<dyn RowIterator + Send + Sync>>,
1052 pub remaining: RecordFirstRowStream,
1054}
1055
1056impl InProgressRows {
1057 pub fn new(remaining: RecordFirstRowStream) -> Self {
1059 Self {
1060 current: None,
1061 remaining,
1062 }
1063 }
1064
1065 pub fn no_more_rows(&self) -> bool {
1068 self.remaining.no_more_rows && self.current.is_none()
1069 }
1070}
1071
1072pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1074
1075#[derive(Debug, Clone)]
1078pub struct LifecycleTimestamps {
1079 pub received: EpochMillis,
1084}
1085
1086impl LifecycleTimestamps {
1087 pub fn new(received: EpochMillis) -> Self {
1089 Self { received }
1090 }
1091}
1092
1093#[derive(Debug)]
1097pub enum TransactionStatus<T> {
1098 Default,
1100 Started(Transaction<T>),
1110 InTransaction(Transaction<T>),
1112 InTransactionImplicit(Transaction<T>),
1115 Failed(Transaction<T>),
1117}
1118
1119impl<T: TimestampManipulation> TransactionStatus<T> {
1120 pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1122 match self {
1123 TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1124 TransactionStatus::Started(txn)
1125 | TransactionStatus::InTransaction(txn)
1126 | TransactionStatus::InTransactionImplicit(txn) => {
1127 (Some(txn.ops), txn.write_lock_guards)
1128 }
1129 }
1130 }
1131
1132 pub fn inner(&self) -> Option<&Transaction<T>> {
1134 match self {
1135 TransactionStatus::Default => None,
1136 TransactionStatus::Started(txn)
1137 | TransactionStatus::InTransaction(txn)
1138 | TransactionStatus::InTransactionImplicit(txn)
1139 | TransactionStatus::Failed(txn) => Some(txn),
1140 }
1141 }
1142
1143 pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1145 match self {
1146 TransactionStatus::Default => None,
1147 TransactionStatus::Started(txn)
1148 | TransactionStatus::InTransaction(txn)
1149 | TransactionStatus::InTransactionImplicit(txn)
1150 | TransactionStatus::Failed(txn) => Some(txn),
1151 }
1152 }
1153
1154 pub fn is_ddl(&self) -> bool {
1156 match self {
1157 TransactionStatus::Default => false,
1158 TransactionStatus::Started(txn)
1159 | TransactionStatus::InTransaction(txn)
1160 | TransactionStatus::InTransactionImplicit(txn)
1161 | TransactionStatus::Failed(txn) => {
1162 matches!(txn.ops, TransactionOps::DDL { .. })
1163 }
1164 }
1165 }
1166
1167 pub fn is_implicit(&self) -> bool {
1170 match self {
1171 TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1172 TransactionStatus::Default
1173 | TransactionStatus::InTransaction(_)
1174 | TransactionStatus::Failed(_) => false,
1175 }
1176 }
1177
1178 pub fn is_in_multi_statement_transaction(&self) -> bool {
1180 match self {
1181 TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1182 true
1183 }
1184 TransactionStatus::Default
1185 | TransactionStatus::Started(_)
1186 | TransactionStatus::Failed(_) => false,
1187 }
1188 }
1189
1190 pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1192 self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1193 }
1194
1195 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1204 match self {
1205 TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1206 TransactionStatus::Started(txn)
1207 | TransactionStatus::InTransaction(txn)
1208 | TransactionStatus::InTransactionImplicit(txn)
1209 | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1210 }
1211 }
1212
1213 pub fn write_locks(&self) -> Option<&WriteLocks> {
1215 match self {
1216 TransactionStatus::Default => None,
1217 TransactionStatus::Started(txn)
1218 | TransactionStatus::InTransaction(txn)
1219 | TransactionStatus::InTransactionImplicit(txn)
1220 | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1221 }
1222 }
1223
1224 pub fn timeline(&self) -> Option<Timeline> {
1226 match self {
1227 TransactionStatus::Default => None,
1228 TransactionStatus::Started(txn)
1229 | TransactionStatus::InTransaction(txn)
1230 | TransactionStatus::InTransactionImplicit(txn)
1231 | TransactionStatus::Failed(txn) => txn.timeline(),
1232 }
1233 }
1234
1235 pub fn cluster(&self) -> Option<ClusterId> {
1237 match self {
1238 TransactionStatus::Default => None,
1239 TransactionStatus::Started(txn)
1240 | TransactionStatus::InTransaction(txn)
1241 | TransactionStatus::InTransactionImplicit(txn)
1242 | TransactionStatus::Failed(txn) => txn.cluster(),
1243 }
1244 }
1245
1246 pub fn catalog_state(&self) -> Option<&CatalogState> {
1248 match self.inner() {
1249 Some(Transaction {
1250 ops: TransactionOps::DDL { state, .. },
1251 ..
1252 }) => Some(state),
1253 _ => None,
1254 }
1255 }
1256
1257 pub fn contains_ops(&self) -> bool {
1259 match self.inner() {
1260 Some(txn) => txn.contains_ops(),
1261 None => false,
1262 }
1263 }
1264
1265 pub fn allows_writes(&self) -> bool {
1269 match self {
1270 TransactionStatus::Started(Transaction { ops, access, .. })
1271 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1272 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1273 match ops {
1274 TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1275 TransactionOps::Peeks { determination, .. } => {
1276 !determination.timestamp_context.contains_timestamp()
1280 }
1281 TransactionOps::Subscribe => false,
1282 TransactionOps::Writes(_) => true,
1283 TransactionOps::SingleStatement { .. } => false,
1284 TransactionOps::DDL { .. } => false,
1285 }
1286 }
1287 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1288 unreachable!()
1289 }
1290 }
1291 }
1292
1293 pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1306 match self {
1307 TransactionStatus::Started(Transaction { ops, access, .. })
1308 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1309 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1310 match ops {
1311 TransactionOps::None => {
1312 if matches!(access, Some(TransactionAccessMode::ReadOnly))
1313 && matches!(add_ops, TransactionOps::Writes(_))
1314 {
1315 return Err(AdapterError::ReadOnlyTransaction);
1316 }
1317 *ops = add_ops;
1318 }
1319 TransactionOps::Peeks {
1320 determination,
1321 cluster_id,
1322 requires_linearization,
1323 } => match add_ops {
1324 TransactionOps::Peeks {
1325 determination: add_timestamp_determination,
1326 cluster_id: add_cluster_id,
1327 requires_linearization: add_requires_linearization,
1328 } => {
1329 assert_eq!(*cluster_id, add_cluster_id);
1330 match (
1331 &determination.timestamp_context,
1332 &add_timestamp_determination.timestamp_context,
1333 ) {
1334 (
1335 TimestampContext::TimelineTimestamp {
1336 timeline: txn_timeline,
1337 chosen_ts: txn_ts,
1338 oracle_ts: _,
1339 },
1340 TimestampContext::TimelineTimestamp {
1341 timeline: add_timeline,
1342 chosen_ts: add_ts,
1343 oracle_ts: _,
1344 },
1345 ) => {
1346 assert_eq!(txn_timeline, add_timeline);
1347 assert_eq!(txn_ts, add_ts);
1348 }
1349 (TimestampContext::NoTimestamp, _) => {
1350 *determination = add_timestamp_determination
1351 }
1352 (_, TimestampContext::NoTimestamp) => {}
1353 };
1354 if matches!(requires_linearization, RequireLinearization::NotRequired)
1355 && matches!(
1356 add_requires_linearization,
1357 RequireLinearization::Required
1358 )
1359 {
1360 *requires_linearization = add_requires_linearization;
1361 }
1362 }
1363 writes @ TransactionOps::Writes(..)
1367 if !determination.timestamp_context.contains_timestamp() =>
1368 {
1369 *ops = writes;
1370 }
1371 _ => return Err(AdapterError::ReadOnlyTransaction),
1372 },
1373 TransactionOps::Subscribe => {
1374 return Err(AdapterError::SubscribeOnlyTransaction);
1375 }
1376 TransactionOps::Writes(txn_writes) => match add_ops {
1377 TransactionOps::Writes(mut add_writes) => {
1378 assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1381 txn_writes.append(&mut add_writes);
1382 }
1383 TransactionOps::Peeks { determination, .. }
1386 if !determination.timestamp_context.contains_timestamp() => {}
1387 _ => {
1388 return Err(AdapterError::WriteOnlyTransaction);
1389 }
1390 },
1391 TransactionOps::SingleStatement { .. } => {
1392 return Err(AdapterError::SingleStatementTransaction);
1393 }
1394 TransactionOps::DDL {
1395 ops: og_ops,
1396 revision: og_revision,
1397 state: og_state,
1398 side_effects,
1399 } => match add_ops {
1400 TransactionOps::DDL {
1401 ops: new_ops,
1402 revision: new_revision,
1403 side_effects: mut net_new_side_effects,
1404 state: new_state,
1405 } => {
1406 if *og_revision != new_revision {
1407 return Err(AdapterError::DDLTransactionRace);
1408 }
1409 if !new_ops.is_empty() {
1411 *og_ops = new_ops;
1412 *og_state = new_state;
1413 }
1414 side_effects.append(&mut net_new_side_effects);
1415 }
1416 _ => return Err(AdapterError::DDLOnlyTransaction),
1417 },
1418 }
1419 }
1420 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1421 unreachable!()
1422 }
1423 }
1424 Ok(())
1425 }
1426}
1427
1428pub type TransactionId = u64;
1430
1431impl<T> Default for TransactionStatus<T> {
1432 fn default() -> Self {
1433 TransactionStatus::Default
1434 }
1435}
1436
1437#[derive(Debug)]
1439pub struct Transaction<T> {
1440 pub pcx: PlanContext,
1442 pub ops: TransactionOps<T>,
1444 pub id: TransactionId,
1449 write_lock_guards: Option<WriteLocks>,
1451 access: Option<TransactionAccessMode>,
1453}
1454
1455impl<T> Transaction<T> {
1456 fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1459 match &mut self.write_lock_guards {
1460 Some(existing) => Err(existing),
1461 locks @ None => {
1462 *locks = Some(guards);
1463 Ok(())
1464 }
1465 }
1466 }
1467
1468 fn timeline(&self) -> Option<Timeline> {
1470 match &self.ops {
1471 TransactionOps::Peeks {
1472 determination:
1473 TimestampDetermination {
1474 timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1475 ..
1476 },
1477 ..
1478 } => Some(timeline.clone()),
1479 TransactionOps::Peeks { .. }
1480 | TransactionOps::None
1481 | TransactionOps::Subscribe
1482 | TransactionOps::Writes(_)
1483 | TransactionOps::SingleStatement { .. }
1484 | TransactionOps::DDL { .. } => None,
1485 }
1486 }
1487
1488 pub fn cluster(&self) -> Option<ClusterId> {
1490 match &self.ops {
1491 TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1492 TransactionOps::None
1493 | TransactionOps::Subscribe
1494 | TransactionOps::Writes(_)
1495 | TransactionOps::SingleStatement { .. }
1496 | TransactionOps::DDL { .. } => None,
1497 }
1498 }
1499
1500 fn contains_ops(&self) -> bool {
1502 !matches!(self.ops, TransactionOps::None)
1503 }
1504}
1505
1506#[derive(Debug, Clone, Copy)]
1508pub enum TransactionCode {
1509 Idle,
1511 InTransaction,
1513 Failed,
1515}
1516
1517impl From<TransactionCode> for u8 {
1518 fn from(code: TransactionCode) -> Self {
1519 match code {
1520 TransactionCode::Idle => b'I',
1521 TransactionCode::InTransaction => b'T',
1522 TransactionCode::Failed => b'E',
1523 }
1524 }
1525}
1526
1527impl From<TransactionCode> for String {
1528 fn from(code: TransactionCode) -> Self {
1529 char::from(u8::from(code)).to_string()
1530 }
1531}
1532
1533impl<T> From<&TransactionStatus<T>> for TransactionCode {
1534 fn from(status: &TransactionStatus<T>) -> TransactionCode {
1536 match status {
1537 TransactionStatus::Default => TransactionCode::Idle,
1538 TransactionStatus::Started(_) => TransactionCode::InTransaction,
1539 TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1540 TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1541 TransactionStatus::Failed(_) => TransactionCode::Failed,
1542 }
1543 }
1544}
1545
1546#[derive(Derivative)]
1552#[derivative(Debug)]
1553pub enum TransactionOps<T> {
1554 None,
1557 Peeks {
1562 determination: TimestampDetermination<T>,
1564 cluster_id: ClusterId,
1566 requires_linearization: RequireLinearization,
1568 },
1569 Subscribe,
1571 Writes(Vec<WriteOp>),
1574 SingleStatement {
1576 stmt: Arc<Statement<Raw>>,
1578 params: mz_sql::plan::Params,
1580 },
1581 DDL {
1585 ops: Vec<crate::catalog::Op>,
1587 state: CatalogState,
1589 #[derivative(Debug = "ignore")]
1591 side_effects: Vec<
1592 Box<
1593 dyn for<'a> FnOnce(
1594 &'a mut Coordinator,
1595 Option<&'a mut ExecuteContext>,
1596 ) -> Pin<Box<dyn Future<Output = ()> + 'a>>
1597 + Send
1598 + Sync,
1599 >,
1600 >,
1601 revision: u64,
1603 },
1604}
1605
1606impl<T> TransactionOps<T> {
1607 fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1608 match self {
1609 TransactionOps::Peeks { determination, .. } => Some(determination),
1610 TransactionOps::None
1611 | TransactionOps::Subscribe
1612 | TransactionOps::Writes(_)
1613 | TransactionOps::SingleStatement { .. }
1614 | TransactionOps::DDL { .. } => None,
1615 }
1616 }
1617}
1618
1619impl<T> Default for TransactionOps<T> {
1620 fn default() -> Self {
1621 Self::None
1622 }
1623}
1624
1625#[derive(Debug, Clone, PartialEq)]
1627pub struct WriteOp {
1628 pub id: CatalogItemId,
1630 pub rows: TableData,
1632}
1633
1634#[derive(Debug)]
1636pub enum RequireLinearization {
1637 Required,
1639 NotRequired,
1641}
1642
1643impl From<&ExplainContext> for RequireLinearization {
1644 fn from(ctx: &ExplainContext) -> Self {
1645 match ctx {
1646 ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1647 RequireLinearization::Required
1648 }
1649 _ => RequireLinearization::NotRequired,
1650 }
1651 }
1652}
1653
1654#[derive(Debug)]
1658pub struct WriteLocks {
1659 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1660 conn_id: ConnectionId,
1662}
1663
1664impl WriteLocks {
1665 pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1670 let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1671 WriteLocksBuilder { locks }
1672 }
1673
1674 pub fn validate(
1677 self,
1678 collections: impl Iterator<Item = CatalogItemId>,
1679 ) -> Result<Self, BTreeSet<CatalogItemId>> {
1680 let mut missing = BTreeSet::new();
1681 for collection in collections {
1682 if !self.locks.contains_key(&collection) {
1683 missing.insert(collection);
1684 }
1685 }
1686
1687 if missing.is_empty() {
1688 Ok(self)
1689 } else {
1690 drop(self);
1692 Err(missing)
1693 }
1694 }
1695}
1696
1697impl Drop for WriteLocks {
1698 fn drop(&mut self) {
1699 if !self.locks.is_empty() {
1701 tracing::info!(
1702 conn_id = %self.conn_id,
1703 locks = ?self.locks,
1704 "dropping write locks",
1705 );
1706 }
1707 }
1708}
1709
1710#[derive(Debug)]
1714pub struct WriteLocksBuilder {
1715 locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1716}
1717
1718impl WriteLocksBuilder {
1719 pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1721 self.locks.insert(id, Some(lock));
1722 }
1723
1724 pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1729 let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1730 self.locks
1731 .into_iter()
1732 .partition_map(|(gid, lock)| match lock {
1733 Some(lock) => itertools::Either::Left((gid, lock)),
1734 None => itertools::Either::Right(gid),
1735 });
1736
1737 match missing.iter().next() {
1738 None => {
1739 tracing::info!(%conn_id, ?locks, "acquired write locks");
1740 Ok(WriteLocks {
1741 locks,
1742 conn_id: conn_id.clone(),
1743 })
1744 }
1745 Some(gid) => {
1746 tracing::info!(?missing, "failed to acquire write locks");
1747 drop(locks);
1749 Err(*gid)
1750 }
1751 }
1752 }
1753}
1754
1755#[derive(Debug, Default)]
1804pub(crate) struct GroupCommitWriteLocks {
1805 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1806}
1807
1808impl GroupCommitWriteLocks {
1809 pub fn merge(&mut self, mut locks: WriteLocks) {
1811 let existing = std::mem::take(&mut locks.locks);
1815 self.locks.extend(existing);
1816 }
1817
1818 pub fn missing_locks(
1820 &self,
1821 writes: impl Iterator<Item = CatalogItemId>,
1822 ) -> BTreeSet<CatalogItemId> {
1823 let mut missing = BTreeSet::new();
1824 for write in writes {
1825 if !self.locks.contains_key(&write) {
1826 missing.insert(write);
1827 }
1828 }
1829 missing
1830 }
1831}
1832
1833impl Drop for GroupCommitWriteLocks {
1834 fn drop(&mut self) {
1835 if !self.locks.is_empty() {
1836 tracing::info!(
1837 locks = ?self.locks,
1838 "dropping group commit write locks",
1839 );
1840 }
1841 }
1842}