1#![warn(missing_docs)]
13
14use std::collections::btree_map::Entry;
15use std::collections::{BTreeMap, BTreeSet};
16use std::fmt::Debug;
17use std::future::Future;
18use std::mem;
19use std::net::IpAddr;
20use std::sync::Arc;
21
22use chrono::{DateTime, Utc};
23use derivative::Derivative;
24use itertools::Itertools;
25use mz_adapter_types::connection::ConnectionId;
26use mz_build_info::{BuildInfo, DUMMY_BUILD_INFO};
27use mz_controller_types::ClusterId;
28use mz_ore::metrics::{MetricsFutureExt, MetricsRegistry};
29use mz_ore::now::{EpochMillis, NowFn};
30use mz_pgwire_common::Format;
31use mz_repr::role_id::RoleId;
32use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
33use mz_repr::{CatalogItemId, Datum, Row, RowIterator, ScalarType, TimestampManipulation};
34use mz_sql::ast::{AstInfo, Raw, Statement, TransactionAccessMode};
35use mz_sql::plan::{Params, PlanContext, QueryWhen, StatementDesc};
36use mz_sql::session::metadata::SessionMetadata;
37use mz_sql::session::user::{
38 INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER, RoleMetadata, SYSTEM_USER, User,
39};
40use mz_sql::session::vars::IsolationLevel;
41pub use mz_sql::session::vars::{
42 DEFAULT_DATABASE_NAME, EndTransactionAction, SERVER_MAJOR_VERSION, SERVER_MINOR_VERSION,
43 SERVER_PATCH_VERSION, SessionVars, Var,
44};
45use mz_sql_parser::ast::TransactionIsolationLevel;
46use mz_storage_client::client::TableData;
47use mz_storage_types::sources::Timeline;
48use qcell::{QCell, QCellOwner};
49use rand::Rng;
50use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
51use tokio::sync::watch;
52use uuid::Uuid;
53
54use crate::AdapterNotice;
55use crate::catalog::CatalogState;
56use crate::client::RecordFirstRowStream;
57use crate::coord::ExplainContext;
58use crate::coord::appends::BuiltinTableAppendNotify;
59use crate::coord::in_memory_oracle::InMemoryTimestampOracle;
60use crate::coord::peek::PeekResponseUnary;
61use crate::coord::statement_logging::PreparedStatementLoggingInfo;
62use crate::coord::timestamp_selection::{TimestampContext, TimestampDetermination};
63use crate::error::AdapterError;
64use crate::metrics::{Metrics, SessionMetrics};
65
66const DUMMY_CONNECTION_ID: ConnectionId = ConnectionId::Static(0);
67
68#[derive(Derivative)]
70#[derivative(Debug)]
71pub struct Session<T = mz_repr::Timestamp>
72where
73 T: Debug + Clone + Send + Sync,
74{
75 conn_id: ConnectionId,
76 uuid: Uuid,
79 prepared_statements: BTreeMap<String, PreparedStatement>,
80 portals: BTreeMap<String, Portal>,
81 transaction: TransactionStatus<T>,
82 pcx: Option<PlanContext>,
83 metrics: SessionMetrics,
84 #[derivative(Debug = "ignore")]
85 builtin_updates: Option<BuiltinTableAppendNotify>,
86
87 role_metadata: Option<RoleMetadata>,
100 client_ip: Option<IpAddr>,
101 vars: SessionVars,
102 notices_tx: mpsc::UnboundedSender<AdapterNotice>,
103 notices_rx: mpsc::UnboundedReceiver<AdapterNotice>,
104 next_transaction_id: TransactionId,
105 secret_key: u32,
106 external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
107 #[derivative(Debug = "ignore")]
119 qcell_owner: QCellOwner,
120 session_oracles: BTreeMap<Timeline, InMemoryTimestampOracle<T, NowFn<T>>>,
121}
122
123impl<T> SessionMetadata for Session<T>
124where
125 T: Debug + Clone + Send + Sync,
126 T: TimestampManipulation,
127{
128 fn conn_id(&self) -> &ConnectionId {
129 &self.conn_id
130 }
131
132 fn client_ip(&self) -> Option<&IpAddr> {
133 self.client_ip.as_ref()
134 }
135
136 fn pcx(&self) -> &PlanContext {
137 &self
138 .transaction()
139 .inner()
140 .expect("no active transaction")
141 .pcx
142 }
143
144 fn role_metadata(&self) -> &RoleMetadata {
145 self.role_metadata
146 .as_ref()
147 .expect("role_metadata invariant violated")
148 }
149
150 fn vars(&self) -> &SessionVars {
151 &self.vars
152 }
153}
154
155#[derive(Debug)]
158pub struct SessionMeta {
159 conn_id: ConnectionId,
160 client_ip: Option<IpAddr>,
161 pcx: PlanContext,
162 role_metadata: RoleMetadata,
163 vars: SessionVars,
164}
165
166impl SessionMetadata for SessionMeta {
167 fn vars(&self) -> &SessionVars {
168 &self.vars
169 }
170
171 fn conn_id(&self) -> &ConnectionId {
172 &self.conn_id
173 }
174
175 fn client_ip(&self) -> Option<&IpAddr> {
176 self.client_ip.as_ref()
177 }
178
179 fn pcx(&self) -> &PlanContext {
180 &self.pcx
181 }
182
183 fn role_metadata(&self) -> &RoleMetadata {
184 &self.role_metadata
185 }
186}
187
188#[derive(Debug, Clone)]
190pub struct SessionConfig {
191 pub conn_id: ConnectionId,
195 pub uuid: Uuid,
200 pub client_ip: Option<IpAddr>,
202 pub user: String,
204 pub external_metadata_rx: Option<watch::Receiver<ExternalUserMetadata>>,
207 pub internal_user_metadata: Option<InternalUserMetadata>,
209 pub helm_chart_version: Option<String>,
211}
212
213impl<T: TimestampManipulation> Session<T> {
214 pub(crate) fn new(
216 build_info: &'static BuildInfo,
217 config: SessionConfig,
218 metrics: SessionMetrics,
219 ) -> Session<T> {
220 assert_ne!(config.conn_id, DUMMY_CONNECTION_ID);
221 Self::new_internal(build_info, config, metrics)
222 }
223
224 pub fn meta(&self) -> SessionMeta {
227 SessionMeta {
228 conn_id: self.conn_id().clone(),
229 client_ip: self.client_ip().copied(),
230 pcx: self.pcx().clone(),
231 role_metadata: self.role_metadata().clone(),
232 vars: self.vars.clone(),
233 }
234
235 }
237
238 pub(crate) fn mint_logging<A: AstInfo>(
248 &self,
249 raw_sql: String,
250 stmt: Option<&Statement<A>>,
251 now: EpochMillis,
252 ) -> Arc<QCell<PreparedStatementLoggingInfo>> {
253 Arc::new(QCell::new(
254 &self.qcell_owner,
255 PreparedStatementLoggingInfo::still_to_log(
256 raw_sql,
257 stmt,
258 now,
259 "".to_string(),
260 self.uuid,
261 false,
262 ),
263 ))
264 }
265
266 pub(crate) fn qcell_rw<'a, T2: 'a>(&'a mut self, cell: &'a Arc<QCell<T2>>) -> &'a mut T2 {
267 self.qcell_owner.rw(&*cell)
268 }
269
270 pub fn uuid(&self) -> Uuid {
273 self.uuid
274 }
275
276 pub fn dummy() -> Session<T> {
281 let registry = MetricsRegistry::new();
282 let metrics = Metrics::register_into(®istry);
283 let metrics = metrics.session_metrics();
284 let mut dummy = Self::new_internal(
285 &DUMMY_BUILD_INFO,
286 SessionConfig {
287 conn_id: DUMMY_CONNECTION_ID,
288 uuid: Uuid::new_v4(),
289 user: SYSTEM_USER.name.clone(),
290 client_ip: None,
291 external_metadata_rx: None,
292 internal_user_metadata: None,
293 helm_chart_version: None,
294 },
295 metrics,
296 );
297 dummy.initialize_role_metadata(RoleId::User(0));
298 dummy
299 }
300
301 fn new_internal(
302 build_info: &'static BuildInfo,
303 SessionConfig {
304 conn_id,
305 uuid,
306 user,
307 client_ip,
308 mut external_metadata_rx,
309 internal_user_metadata,
310 helm_chart_version,
311 }: SessionConfig,
312 metrics: SessionMetrics,
313 ) -> Session<T> {
314 let (notices_tx, notices_rx) = mpsc::unbounded_channel();
315 let default_cluster = INTERNAL_USER_NAME_TO_DEFAULT_CLUSTER.get(&user);
316 let user = User {
317 name: user,
318 internal_metadata: internal_user_metadata,
319 external_metadata: external_metadata_rx
320 .as_mut()
321 .map(|rx| rx.borrow_and_update().clone()),
322 };
323 let mut vars = SessionVars::new_unchecked(build_info, user, helm_chart_version);
324 if let Some(default_cluster) = default_cluster {
325 vars.set_cluster(default_cluster.clone());
326 }
327 Session {
328 conn_id,
329 uuid,
330 transaction: TransactionStatus::Default,
331 pcx: None,
332 metrics,
333 builtin_updates: None,
334 prepared_statements: BTreeMap::new(),
335 portals: BTreeMap::new(),
336 role_metadata: None,
337 client_ip,
338 vars,
339 notices_tx,
340 notices_rx,
341 next_transaction_id: 0,
342 secret_key: rand::thread_rng().r#gen(),
343 external_metadata_rx,
344 qcell_owner: QCellOwner::new(),
345 session_oracles: BTreeMap::new(),
346 }
347 }
348
349 pub fn secret_key(&self) -> u32 {
351 self.secret_key
352 }
353
354 fn new_pcx(&self, mut wall_time: DateTime<Utc>) -> PlanContext {
355 if let Some(mock_time) = self.vars().unsafe_new_transaction_wall_time() {
356 wall_time = *mock_time;
357 }
358 PlanContext::new(wall_time)
359 }
360
361 pub fn start_transaction(
364 &mut self,
365 wall_time: DateTime<Utc>,
366 access: Option<TransactionAccessMode>,
367 isolation_level: Option<TransactionIsolationLevel>,
368 ) -> Result<(), AdapterError> {
369 if let Some(txn) = self.transaction.inner() {
371 let read_write_prohibited = match txn.ops {
375 TransactionOps::Peeks { .. } | TransactionOps::Subscribe => {
376 txn.access == Some(TransactionAccessMode::ReadOnly)
377 }
378 TransactionOps::None
379 | TransactionOps::Writes(_)
380 | TransactionOps::SingleStatement { .. }
381 | TransactionOps::DDL { .. } => false,
382 };
383
384 if read_write_prohibited && access == Some(TransactionAccessMode::ReadWrite) {
385 return Err(AdapterError::ReadWriteUnavailable);
386 }
387 }
388
389 match std::mem::take(&mut self.transaction) {
390 TransactionStatus::Default => {
391 let id = self.next_transaction_id;
392 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
393 self.transaction = TransactionStatus::InTransaction(Transaction {
394 pcx: self.new_pcx(wall_time),
395 ops: TransactionOps::None,
396 write_lock_guards: None,
397 access,
398 id,
399 });
400 }
401 TransactionStatus::Started(mut txn)
402 | TransactionStatus::InTransactionImplicit(mut txn)
403 | TransactionStatus::InTransaction(mut txn) => {
404 if access.is_some() {
405 txn.access = access;
406 }
407 self.transaction = TransactionStatus::InTransaction(txn);
408 }
409 TransactionStatus::Failed(_) => unreachable!(),
410 };
411
412 if let Some(isolation_level) = isolation_level {
413 self.vars
414 .set_local_transaction_isolation(isolation_level.into());
415 }
416
417 Ok(())
418 }
419
420 pub fn start_transaction_implicit(&mut self, wall_time: DateTime<Utc>, stmts: usize) {
423 if let TransactionStatus::Default = self.transaction {
424 let id = self.next_transaction_id;
425 self.next_transaction_id = self.next_transaction_id.wrapping_add(1);
426 let txn = Transaction {
427 pcx: self.new_pcx(wall_time),
428 ops: TransactionOps::None,
429 write_lock_guards: None,
430 access: None,
431 id,
432 };
433 match stmts {
434 1 => self.transaction = TransactionStatus::Started(txn),
435 n if n > 1 => self.transaction = TransactionStatus::InTransactionImplicit(txn),
436 _ => {}
437 }
438 }
439 }
440
441 pub fn start_transaction_single_stmt(&mut self, wall_time: DateTime<Utc>) {
443 self.start_transaction_implicit(wall_time, 1);
444 }
445
446 #[must_use]
456 pub fn clear_transaction(&mut self) -> TransactionStatus<T> {
457 self.portals.clear();
458 self.pcx = None;
459 mem::take(&mut self.transaction)
460 }
461
462 pub fn fail_transaction(mut self) -> Self {
464 match self.transaction {
465 TransactionStatus::Default => unreachable!(),
466 TransactionStatus::Started(txn)
467 | TransactionStatus::InTransactionImplicit(txn)
468 | TransactionStatus::InTransaction(txn) => {
469 self.transaction = TransactionStatus::Failed(txn);
470 }
471 TransactionStatus::Failed(_) => {}
472 };
473 self
474 }
475
476 pub fn transaction(&self) -> &TransactionStatus<T> {
478 &self.transaction
479 }
480
481 pub fn transaction_mut(&mut self) -> &mut TransactionStatus<T> {
483 &mut self.transaction
484 }
485
486 pub fn transaction_code(&self) -> TransactionCode {
488 self.transaction().into()
489 }
490
491 pub fn add_transaction_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
495 self.transaction.add_ops(add_ops)
496 }
497
498 pub fn retain_notice_transmitter(&self) -> UnboundedSender<AdapterNotice> {
500 self.notices_tx.clone()
501 }
502
503 pub fn add_notice(&self, notice: AdapterNotice) {
505 self.add_notices([notice])
506 }
507
508 pub fn add_notices(&self, notices: impl IntoIterator<Item = AdapterNotice>) {
510 for notice in notices {
511 let _ = self.notices_tx.send(notice);
512 }
513 }
514
515 pub async fn recv_notice(&mut self) -> AdapterNotice {
519 loop {
521 let notice = self
522 .notices_rx
523 .recv()
524 .await
525 .expect("Session also holds a sender, so recv won't ever return None");
526 match self.notice_filter(notice) {
527 Some(notice) => return notice,
528 None => continue,
529 }
530 }
531 }
532
533 pub fn drain_notices(&mut self) -> Vec<AdapterNotice> {
535 let mut notices = Vec::new();
536 while let Ok(notice) = self.notices_rx.try_recv() {
537 if let Some(notice) = self.notice_filter(notice) {
538 notices.push(notice);
539 }
540 }
541 notices
542 }
543
544 fn notice_filter(&self, notice: AdapterNotice) -> Option<AdapterNotice> {
546 let minimum_client_severity = self.vars.client_min_messages();
548 let sev = notice.severity();
549 if !minimum_client_severity.should_output_to_client(&sev) {
550 return None;
551 }
552 if let AdapterNotice::ClusterReplicaStatusChanged { cluster, .. } = ¬ice {
554 if cluster != self.vars.cluster() {
555 return None;
556 }
557 }
558 Some(notice)
559 }
560
561 pub fn clear_transaction_ops(&mut self) {
564 if let Some(txn) = self.transaction.inner_mut() {
565 txn.ops = TransactionOps::None;
566 }
567 }
568
569 pub fn take_transaction_timestamp_context(&mut self) -> Option<TimestampContext<T>> {
574 if let Some(Transaction { ops, .. }) = self.transaction.inner_mut() {
575 if let TransactionOps::Peeks { .. } = ops {
576 let ops = std::mem::take(ops);
577 Some(
578 ops.timestamp_determination()
579 .expect("checked above")
580 .timestamp_context,
581 )
582 } else {
583 None
584 }
585 } else {
586 None
587 }
588 }
589
590 pub fn get_transaction_timestamp_determination(&self) -> Option<TimestampDetermination<T>> {
595 match self.transaction.inner() {
596 Some(Transaction {
597 pcx: _,
598 ops: TransactionOps::Peeks { determination, .. },
599 write_lock_guards: _,
600 access: _,
601 id: _,
602 }) => Some(determination.clone()),
603 _ => None,
604 }
605 }
606
607 pub fn contains_read_timestamp(&self) -> bool {
609 matches!(
610 self.transaction.inner(),
611 Some(Transaction {
612 pcx: _,
613 ops: TransactionOps::Peeks {
614 determination: TimestampDetermination {
615 timestamp_context: TimestampContext::TimelineTimestamp { .. },
616 ..
617 },
618 ..
619 },
620 write_lock_guards: _,
621 access: _,
622 id: _,
623 })
624 )
625 }
626
627 pub fn set_prepared_statement(
629 &mut self,
630 name: String,
631 stmt: Option<Statement<Raw>>,
632 raw_sql: String,
633 desc: StatementDesc,
634 catalog_revision: u64,
635 now: EpochMillis,
636 ) {
637 let logging = PreparedStatementLoggingInfo::still_to_log(
638 raw_sql,
639 stmt.as_ref(),
640 now,
641 name.clone(),
642 self.uuid,
643 false,
644 );
645 let statement = PreparedStatement {
646 stmt,
647 desc,
648 catalog_revision,
649 logging: Arc::new(QCell::new(&self.qcell_owner, logging)),
650 };
651 self.prepared_statements.insert(name, statement);
652 }
653
654 pub fn remove_prepared_statement(&mut self, name: &str) -> bool {
658 self.prepared_statements.remove(name).is_some()
659 }
660
661 pub fn remove_all_prepared_statements(&mut self) {
663 self.prepared_statements.clear();
664 }
665
666 pub fn get_prepared_statement_unverified(&self, name: &str) -> Option<&PreparedStatement> {
671 self.prepared_statements.get(name)
672 }
673
674 pub fn get_prepared_statement_mut_unverified(
679 &mut self,
680 name: &str,
681 ) -> Option<&mut PreparedStatement> {
682 self.prepared_statements.get_mut(name)
683 }
684
685 pub fn prepared_statements(&self) -> &BTreeMap<String, PreparedStatement> {
687 &self.prepared_statements
688 }
689
690 pub fn set_portal(
700 &mut self,
701 portal_name: String,
702 desc: StatementDesc,
703 stmt: Option<Statement<Raw>>,
704 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
705 params: Vec<(Datum, ScalarType)>,
706 result_formats: Vec<Format>,
707 catalog_revision: u64,
708 ) -> Result<(), AdapterError> {
709 if !portal_name.is_empty() && self.portals.contains_key(&portal_name) {
711 return Err(AdapterError::DuplicateCursor(portal_name));
712 }
713 let param_types = desc.param_types.clone();
714 self.portals.insert(
715 portal_name,
716 Portal {
717 stmt: stmt.map(Arc::new),
718 desc,
719 catalog_revision,
720 parameters: Params {
721 datums: Row::pack(params.iter().map(|(d, _t)| d)),
722 execute_types: params.into_iter().map(|(_d, t)| t).collect(),
723 expected_types: param_types,
724 },
725 result_formats,
726 state: PortalState::NotStarted,
727 logging,
728 lifecycle_timestamps: None,
729 },
730 );
731 Ok(())
732 }
733
734 pub fn remove_portal(&mut self, portal_name: &str) -> bool {
738 self.portals.remove(portal_name).is_some()
739 }
740
741 pub fn get_portal_unverified(&self, portal_name: &str) -> Option<&Portal> {
745 self.portals.get(portal_name)
746 }
747
748 pub fn get_portal_unverified_mut(&mut self, portal_name: &str) -> Option<&mut Portal> {
752 self.portals.get_mut(portal_name)
753 }
754
755 pub fn create_new_portal(
757 &mut self,
758 stmt: Option<Statement<Raw>>,
759 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
760 desc: StatementDesc,
761 parameters: Params,
762 result_formats: Vec<Format>,
763 catalog_revision: u64,
764 ) -> Result<String, AdapterError> {
765 for i in 0usize.. {
768 let name = format!("<unnamed portal {}>", i);
769 match self.portals.entry(name.clone()) {
770 Entry::Occupied(_) => continue,
771 Entry::Vacant(entry) => {
772 entry.insert(Portal {
773 stmt: stmt.map(Arc::new),
774 desc,
775 catalog_revision,
776 parameters,
777 result_formats,
778 state: PortalState::NotStarted,
779 logging,
780 lifecycle_timestamps: None,
781 });
782 return Ok(name);
783 }
784 }
785 }
786
787 coord_bail!("unable to create a new portal");
788 }
789
790 pub fn reset(&mut self) {
793 let _ = self.clear_transaction();
794 self.prepared_statements.clear();
795 self.vars.reset_all();
796 }
797
798 pub fn application_name(&self) -> &str {
802 self.vars.application_name()
803 }
804
805 pub fn vars(&self) -> &SessionVars {
807 &self.vars
808 }
809
810 pub fn vars_mut(&mut self) -> &mut SessionVars {
812 &mut self.vars
813 }
814
815 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
821 self.transaction.try_grant_write_locks(guards)
822 }
823
824 pub fn apply_external_metadata_updates(&mut self) {
826 let Some(rx) = &mut self.external_metadata_rx else {
828 return;
829 };
830
831 if !rx.has_changed().unwrap_or(false) {
833 return;
834 }
835
836 let metadata = rx.borrow_and_update().clone();
839 self.vars.set_external_user_metadata(metadata);
840 }
841
842 pub fn initialize_role_metadata(&mut self, role_id: RoleId) {
844 self.role_metadata = Some(RoleMetadata::new(role_id));
845 }
846
847 pub fn ensure_timestamp_oracle(
850 &mut self,
851 timeline: Timeline,
852 ) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
853 self.session_oracles
854 .entry(timeline)
855 .or_insert_with(|| InMemoryTimestampOracle::new(T::minimum(), NowFn::from(T::minimum)))
856 }
857
858 pub fn ensure_local_timestamp_oracle(&mut self) -> &mut InMemoryTimestampOracle<T, NowFn<T>> {
861 self.ensure_timestamp_oracle(Timeline::EpochMilliseconds)
862 }
863
864 pub fn get_timestamp_oracle(
866 &self,
867 timeline: &Timeline,
868 ) -> Option<&InMemoryTimestampOracle<T, NowFn<T>>> {
869 self.session_oracles.get(timeline)
870 }
871
872 pub fn apply_write(&mut self, timestamp: T) {
875 if self.vars().transaction_isolation() == &IsolationLevel::StrongSessionSerializable {
876 self.ensure_local_timestamp_oracle().apply_write(timestamp);
877 }
878 }
879
880 pub fn metrics(&self) -> &SessionMetrics {
882 &self.metrics
883 }
884
885 pub fn set_builtin_table_updates(&mut self, fut: BuiltinTableAppendNotify) {
887 let prev = self.builtin_updates.replace(fut);
888 mz_ore::soft_assert_or_log!(prev.is_none(), "replacing old builtin table notify");
889 }
890
891 pub fn clear_builtin_table_updates(&mut self) -> Option<impl Future<Output = ()> + 'static> {
894 if let Some(fut) = self.builtin_updates.take() {
895 let histogram = self
897 .metrics()
898 .session_startup_table_writes_seconds()
899 .clone();
900 Some(async move {
901 fut.wall_time().observe(histogram).await;
902 })
903 } else {
904 None
905 }
906 }
907}
908
909#[derive(Derivative, Clone)]
911#[derivative(Debug)]
912pub struct PreparedStatement {
913 stmt: Option<Statement<Raw>>,
914 desc: StatementDesc,
915 pub catalog_revision: u64,
917 #[derivative(Debug = "ignore")]
918 logging: Arc<QCell<PreparedStatementLoggingInfo>>,
919}
920
921impl PreparedStatement {
922 pub fn stmt(&self) -> Option<&Statement<Raw>> {
925 self.stmt.as_ref()
926 }
927
928 pub fn desc(&self) -> &StatementDesc {
930 &self.desc
931 }
932
933 pub fn logging(&self) -> &Arc<QCell<PreparedStatementLoggingInfo>> {
935 &self.logging
936 }
937}
938
939#[derive(Derivative)]
941#[derivative(Debug)]
942pub struct Portal {
943 pub stmt: Option<Arc<Statement<Raw>>>,
945 pub desc: StatementDesc,
947 pub catalog_revision: u64,
949 pub parameters: Params,
951 pub result_formats: Vec<Format>,
953 #[derivative(Debug = "ignore")]
955 pub logging: Arc<QCell<PreparedStatementLoggingInfo>>,
956 #[derivative(Debug = "ignore")]
958 pub state: PortalState,
959 pub lifecycle_timestamps: Option<LifecycleTimestamps>,
961}
962
963pub enum PortalState {
965 NotStarted,
967 InProgress(Option<InProgressRows>),
970 Completed(Option<String>),
974}
975
976pub struct InProgressRows {
978 pub current: Option<Box<dyn RowIterator + Send + Sync>>,
980 pub remaining: RecordFirstRowStream,
982}
983
984impl InProgressRows {
985 pub fn new(remaining: RecordFirstRowStream) -> Self {
987 Self {
988 current: None,
989 remaining,
990 }
991 }
992
993 pub fn no_more_rows(&self) -> bool {
996 self.remaining.no_more_rows && self.current.is_none()
997 }
998}
999
1000pub type RowBatchStream = UnboundedReceiver<PeekResponseUnary>;
1002
1003#[derive(Debug, Clone)]
1006pub struct LifecycleTimestamps {
1007 pub received: EpochMillis,
1012}
1013
1014impl LifecycleTimestamps {
1015 pub fn new(received: EpochMillis) -> Self {
1017 Self { received }
1018 }
1019}
1020
1021#[derive(Debug)]
1025pub enum TransactionStatus<T> {
1026 Default,
1028 Started(Transaction<T>),
1038 InTransaction(Transaction<T>),
1040 InTransactionImplicit(Transaction<T>),
1043 Failed(Transaction<T>),
1045}
1046
1047impl<T: TimestampManipulation> TransactionStatus<T> {
1048 pub fn into_ops_and_lock_guard(self) -> (Option<TransactionOps<T>>, Option<WriteLocks>) {
1050 match self {
1051 TransactionStatus::Default | TransactionStatus::Failed(_) => (None, None),
1052 TransactionStatus::Started(txn)
1053 | TransactionStatus::InTransaction(txn)
1054 | TransactionStatus::InTransactionImplicit(txn) => {
1055 (Some(txn.ops), txn.write_lock_guards)
1056 }
1057 }
1058 }
1059
1060 pub fn inner(&self) -> Option<&Transaction<T>> {
1062 match self {
1063 TransactionStatus::Default => None,
1064 TransactionStatus::Started(txn)
1065 | TransactionStatus::InTransaction(txn)
1066 | TransactionStatus::InTransactionImplicit(txn)
1067 | TransactionStatus::Failed(txn) => Some(txn),
1068 }
1069 }
1070
1071 pub fn inner_mut(&mut self) -> Option<&mut Transaction<T>> {
1073 match self {
1074 TransactionStatus::Default => None,
1075 TransactionStatus::Started(txn)
1076 | TransactionStatus::InTransaction(txn)
1077 | TransactionStatus::InTransactionImplicit(txn)
1078 | TransactionStatus::Failed(txn) => Some(txn),
1079 }
1080 }
1081
1082 pub fn is_ddl(&self) -> bool {
1084 match self {
1085 TransactionStatus::Default => false,
1086 TransactionStatus::Started(txn)
1087 | TransactionStatus::InTransaction(txn)
1088 | TransactionStatus::InTransactionImplicit(txn)
1089 | TransactionStatus::Failed(txn) => {
1090 matches!(txn.ops, TransactionOps::DDL { .. })
1091 }
1092 }
1093 }
1094
1095 pub fn is_implicit(&self) -> bool {
1098 match self {
1099 TransactionStatus::Started(_) | TransactionStatus::InTransactionImplicit(_) => true,
1100 TransactionStatus::Default
1101 | TransactionStatus::InTransaction(_)
1102 | TransactionStatus::Failed(_) => false,
1103 }
1104 }
1105
1106 pub fn is_in_multi_statement_transaction(&self) -> bool {
1108 match self {
1109 TransactionStatus::InTransaction(_) | TransactionStatus::InTransactionImplicit(_) => {
1110 true
1111 }
1112 TransactionStatus::Default
1113 | TransactionStatus::Started(_)
1114 | TransactionStatus::Failed(_) => false,
1115 }
1116 }
1117
1118 pub fn in_immediate_multi_stmt_txn(&self, when: &QueryWhen) -> bool {
1120 self.is_in_multi_statement_transaction() && when == &QueryWhen::Immediately
1121 }
1122
1123 pub fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1132 match self {
1133 TransactionStatus::Default => panic!("cannot grant write lock to txn not yet started"),
1134 TransactionStatus::Started(txn)
1135 | TransactionStatus::InTransaction(txn)
1136 | TransactionStatus::InTransactionImplicit(txn)
1137 | TransactionStatus::Failed(txn) => txn.try_grant_write_locks(guards),
1138 }
1139 }
1140
1141 pub fn write_locks(&self) -> Option<&WriteLocks> {
1143 match self {
1144 TransactionStatus::Default => None,
1145 TransactionStatus::Started(txn)
1146 | TransactionStatus::InTransaction(txn)
1147 | TransactionStatus::InTransactionImplicit(txn)
1148 | TransactionStatus::Failed(txn) => txn.write_lock_guards.as_ref(),
1149 }
1150 }
1151
1152 pub fn timeline(&self) -> Option<Timeline> {
1154 match self {
1155 TransactionStatus::Default => None,
1156 TransactionStatus::Started(txn)
1157 | TransactionStatus::InTransaction(txn)
1158 | TransactionStatus::InTransactionImplicit(txn)
1159 | TransactionStatus::Failed(txn) => txn.timeline(),
1160 }
1161 }
1162
1163 pub fn cluster(&self) -> Option<ClusterId> {
1165 match self {
1166 TransactionStatus::Default => None,
1167 TransactionStatus::Started(txn)
1168 | TransactionStatus::InTransaction(txn)
1169 | TransactionStatus::InTransactionImplicit(txn)
1170 | TransactionStatus::Failed(txn) => txn.cluster(),
1171 }
1172 }
1173
1174 pub fn catalog_state(&self) -> Option<&CatalogState> {
1176 match self.inner() {
1177 Some(Transaction {
1178 ops: TransactionOps::DDL { state, .. },
1179 ..
1180 }) => Some(state),
1181 _ => None,
1182 }
1183 }
1184
1185 pub fn contains_ops(&self) -> bool {
1187 match self.inner() {
1188 Some(txn) => txn.contains_ops(),
1189 None => false,
1190 }
1191 }
1192
1193 pub fn allows_writes(&self) -> bool {
1197 match self {
1198 TransactionStatus::Started(Transaction { ops, access, .. })
1199 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1200 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1201 match ops {
1202 TransactionOps::None => access != &Some(TransactionAccessMode::ReadOnly),
1203 TransactionOps::Peeks { determination, .. } => {
1204 !determination.timestamp_context.contains_timestamp()
1208 }
1209 TransactionOps::Subscribe => false,
1210 TransactionOps::Writes(_) => true,
1211 TransactionOps::SingleStatement { .. } => false,
1212 TransactionOps::DDL { .. } => false,
1213 }
1214 }
1215 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1216 unreachable!()
1217 }
1218 }
1219 }
1220
1221 pub fn add_ops(&mut self, add_ops: TransactionOps<T>) -> Result<(), AdapterError> {
1234 match self {
1235 TransactionStatus::Started(Transaction { ops, access, .. })
1236 | TransactionStatus::InTransaction(Transaction { ops, access, .. })
1237 | TransactionStatus::InTransactionImplicit(Transaction { ops, access, .. }) => {
1238 match ops {
1239 TransactionOps::None => {
1240 if matches!(access, Some(TransactionAccessMode::ReadOnly))
1241 && matches!(add_ops, TransactionOps::Writes(_))
1242 {
1243 return Err(AdapterError::ReadOnlyTransaction);
1244 }
1245 *ops = add_ops;
1246 }
1247 TransactionOps::Peeks {
1248 determination,
1249 cluster_id,
1250 requires_linearization,
1251 } => match add_ops {
1252 TransactionOps::Peeks {
1253 determination: add_timestamp_determination,
1254 cluster_id: add_cluster_id,
1255 requires_linearization: add_requires_linearization,
1256 } => {
1257 assert_eq!(*cluster_id, add_cluster_id);
1258 match (
1259 &determination.timestamp_context,
1260 &add_timestamp_determination.timestamp_context,
1261 ) {
1262 (
1263 TimestampContext::TimelineTimestamp {
1264 timeline: txn_timeline,
1265 chosen_ts: txn_ts,
1266 oracle_ts: _,
1267 },
1268 TimestampContext::TimelineTimestamp {
1269 timeline: add_timeline,
1270 chosen_ts: add_ts,
1271 oracle_ts: _,
1272 },
1273 ) => {
1274 assert_eq!(txn_timeline, add_timeline);
1275 assert_eq!(txn_ts, add_ts);
1276 }
1277 (TimestampContext::NoTimestamp, _) => {
1278 *determination = add_timestamp_determination
1279 }
1280 (_, TimestampContext::NoTimestamp) => {}
1281 };
1282 if matches!(requires_linearization, RequireLinearization::NotRequired)
1283 && matches!(
1284 add_requires_linearization,
1285 RequireLinearization::Required
1286 )
1287 {
1288 *requires_linearization = add_requires_linearization;
1289 }
1290 }
1291 writes @ TransactionOps::Writes(..)
1295 if !determination.timestamp_context.contains_timestamp() =>
1296 {
1297 *ops = writes;
1298 }
1299 _ => return Err(AdapterError::ReadOnlyTransaction),
1300 },
1301 TransactionOps::Subscribe => {
1302 return Err(AdapterError::SubscribeOnlyTransaction);
1303 }
1304 TransactionOps::Writes(txn_writes) => match add_ops {
1305 TransactionOps::Writes(mut add_writes) => {
1306 assert!(!matches!(access, Some(TransactionAccessMode::ReadOnly)));
1309 txn_writes.append(&mut add_writes);
1310 }
1311 TransactionOps::Peeks { determination, .. }
1314 if !determination.timestamp_context.contains_timestamp() => {}
1315 _ => {
1316 return Err(AdapterError::WriteOnlyTransaction);
1317 }
1318 },
1319 TransactionOps::SingleStatement { .. } => {
1320 return Err(AdapterError::SingleStatementTransaction);
1321 }
1322 TransactionOps::DDL {
1323 ops: og_ops,
1324 revision: og_revision,
1325 state: og_state,
1326 } => match add_ops {
1327 TransactionOps::DDL {
1328 ops: new_ops,
1329 revision: new_revision,
1330 state: new_state,
1331 } => {
1332 if *og_revision != new_revision {
1333 return Err(AdapterError::DDLTransactionRace);
1334 }
1335 if !new_ops.is_empty() {
1337 *og_ops = new_ops;
1338 *og_state = new_state;
1339 }
1340 }
1341 _ => return Err(AdapterError::DDLOnlyTransaction),
1342 },
1343 }
1344 }
1345 TransactionStatus::Default | TransactionStatus::Failed(_) => {
1346 unreachable!()
1347 }
1348 }
1349 Ok(())
1350 }
1351}
1352
1353pub type TransactionId = u64;
1355
1356impl<T> Default for TransactionStatus<T> {
1357 fn default() -> Self {
1358 TransactionStatus::Default
1359 }
1360}
1361
1362#[derive(Debug)]
1364pub struct Transaction<T> {
1365 pub pcx: PlanContext,
1367 pub ops: TransactionOps<T>,
1369 pub id: TransactionId,
1374 write_lock_guards: Option<WriteLocks>,
1376 access: Option<TransactionAccessMode>,
1378}
1379
1380impl<T> Transaction<T> {
1381 fn try_grant_write_locks(&mut self, guards: WriteLocks) -> Result<(), &WriteLocks> {
1384 match &mut self.write_lock_guards {
1385 Some(existing) => Err(existing),
1386 locks @ None => {
1387 *locks = Some(guards);
1388 Ok(())
1389 }
1390 }
1391 }
1392
1393 fn timeline(&self) -> Option<Timeline> {
1395 match &self.ops {
1396 TransactionOps::Peeks {
1397 determination:
1398 TimestampDetermination {
1399 timestamp_context: TimestampContext::TimelineTimestamp { timeline, .. },
1400 ..
1401 },
1402 ..
1403 } => Some(timeline.clone()),
1404 TransactionOps::Peeks { .. }
1405 | TransactionOps::None
1406 | TransactionOps::Subscribe
1407 | TransactionOps::Writes(_)
1408 | TransactionOps::SingleStatement { .. }
1409 | TransactionOps::DDL { .. } => None,
1410 }
1411 }
1412
1413 pub fn cluster(&self) -> Option<ClusterId> {
1415 match &self.ops {
1416 TransactionOps::Peeks { cluster_id, .. } => Some(cluster_id.clone()),
1417 TransactionOps::None
1418 | TransactionOps::Subscribe
1419 | TransactionOps::Writes(_)
1420 | TransactionOps::SingleStatement { .. }
1421 | TransactionOps::DDL { .. } => None,
1422 }
1423 }
1424
1425 fn contains_ops(&self) -> bool {
1427 !matches!(self.ops, TransactionOps::None)
1428 }
1429}
1430
1431#[derive(Debug, Clone, Copy)]
1433pub enum TransactionCode {
1434 Idle,
1436 InTransaction,
1438 Failed,
1440}
1441
1442impl From<TransactionCode> for u8 {
1443 fn from(code: TransactionCode) -> Self {
1444 match code {
1445 TransactionCode::Idle => b'I',
1446 TransactionCode::InTransaction => b'T',
1447 TransactionCode::Failed => b'E',
1448 }
1449 }
1450}
1451
1452impl From<TransactionCode> for String {
1453 fn from(code: TransactionCode) -> Self {
1454 char::from(u8::from(code)).to_string()
1455 }
1456}
1457
1458impl<T> From<&TransactionStatus<T>> for TransactionCode {
1459 fn from(status: &TransactionStatus<T>) -> TransactionCode {
1461 match status {
1462 TransactionStatus::Default => TransactionCode::Idle,
1463 TransactionStatus::Started(_) => TransactionCode::InTransaction,
1464 TransactionStatus::InTransaction(_) => TransactionCode::InTransaction,
1465 TransactionStatus::InTransactionImplicit(_) => TransactionCode::InTransaction,
1466 TransactionStatus::Failed(_) => TransactionCode::Failed,
1467 }
1468 }
1469}
1470
1471#[derive(Debug)]
1477pub enum TransactionOps<T> {
1478 None,
1481 Peeks {
1486 determination: TimestampDetermination<T>,
1488 cluster_id: ClusterId,
1490 requires_linearization: RequireLinearization,
1492 },
1493 Subscribe,
1495 Writes(Vec<WriteOp>),
1498 SingleStatement {
1500 stmt: Arc<Statement<Raw>>,
1502 params: mz_sql::plan::Params,
1504 },
1505 DDL {
1509 ops: Vec<crate::catalog::Op>,
1511 state: CatalogState,
1513 revision: u64,
1515 },
1516}
1517
1518impl<T> TransactionOps<T> {
1519 fn timestamp_determination(self) -> Option<TimestampDetermination<T>> {
1520 match self {
1521 TransactionOps::Peeks { determination, .. } => Some(determination),
1522 TransactionOps::None
1523 | TransactionOps::Subscribe
1524 | TransactionOps::Writes(_)
1525 | TransactionOps::SingleStatement { .. }
1526 | TransactionOps::DDL { .. } => None,
1527 }
1528 }
1529}
1530
1531impl<T> Default for TransactionOps<T> {
1532 fn default() -> Self {
1533 Self::None
1534 }
1535}
1536
1537#[derive(Debug, Clone, PartialEq)]
1539pub struct WriteOp {
1540 pub id: CatalogItemId,
1542 pub rows: TableData,
1544}
1545
1546#[derive(Debug)]
1548pub enum RequireLinearization {
1549 Required,
1551 NotRequired,
1553}
1554
1555impl From<&ExplainContext> for RequireLinearization {
1556 fn from(ctx: &ExplainContext) -> Self {
1557 match ctx {
1558 ExplainContext::None | ExplainContext::PlanInsightsNotice(_) => {
1559 RequireLinearization::Required
1560 }
1561 _ => RequireLinearization::NotRequired,
1562 }
1563 }
1564}
1565
1566#[derive(Debug)]
1570pub struct WriteLocks {
1571 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1572 conn_id: ConnectionId,
1574}
1575
1576impl WriteLocks {
1577 pub fn builder(sources: impl IntoIterator<Item = CatalogItemId>) -> WriteLocksBuilder {
1582 let locks = sources.into_iter().map(|gid| (gid, None)).collect();
1583 WriteLocksBuilder { locks }
1584 }
1585
1586 pub fn validate(
1589 self,
1590 collections: impl Iterator<Item = CatalogItemId>,
1591 ) -> Result<Self, BTreeSet<CatalogItemId>> {
1592 let mut missing = BTreeSet::new();
1593 for collection in collections {
1594 if !self.locks.contains_key(&collection) {
1595 missing.insert(collection);
1596 }
1597 }
1598
1599 if missing.is_empty() {
1600 Ok(self)
1601 } else {
1602 drop(self);
1604 Err(missing)
1605 }
1606 }
1607}
1608
1609impl Drop for WriteLocks {
1610 fn drop(&mut self) {
1611 if !self.locks.is_empty() {
1613 tracing::info!(
1614 conn_id = %self.conn_id,
1615 locks = ?self.locks,
1616 "dropping write locks",
1617 );
1618 }
1619 }
1620}
1621
1622#[derive(Debug)]
1626pub struct WriteLocksBuilder {
1627 locks: BTreeMap<CatalogItemId, Option<tokio::sync::OwnedMutexGuard<()>>>,
1628}
1629
1630impl WriteLocksBuilder {
1631 pub fn insert_lock(&mut self, id: CatalogItemId, lock: tokio::sync::OwnedMutexGuard<()>) {
1633 self.locks.insert(id, Some(lock));
1634 }
1635
1636 pub fn all_or_nothing(self, conn_id: &ConnectionId) -> Result<WriteLocks, CatalogItemId> {
1641 let (locks, missing): (BTreeMap<_, _>, BTreeSet<_>) =
1642 self.locks
1643 .into_iter()
1644 .partition_map(|(gid, lock)| match lock {
1645 Some(lock) => itertools::Either::Left((gid, lock)),
1646 None => itertools::Either::Right(gid),
1647 });
1648
1649 match missing.iter().next() {
1650 None => {
1651 tracing::info!(%conn_id, ?locks, "acquired write locks");
1652 Ok(WriteLocks {
1653 locks,
1654 conn_id: conn_id.clone(),
1655 })
1656 }
1657 Some(gid) => {
1658 tracing::info!(?missing, "failed to acquire write locks");
1659 drop(locks);
1661 Err(*gid)
1662 }
1663 }
1664 }
1665}
1666
1667#[derive(Debug, Default)]
1716pub(crate) struct GroupCommitWriteLocks {
1717 locks: BTreeMap<CatalogItemId, tokio::sync::OwnedMutexGuard<()>>,
1718}
1719
1720impl GroupCommitWriteLocks {
1721 pub fn merge(&mut self, mut locks: WriteLocks) {
1723 let existing = std::mem::take(&mut locks.locks);
1727 self.locks.extend(existing);
1728 }
1729
1730 pub fn missing_locks(
1732 &self,
1733 writes: impl Iterator<Item = CatalogItemId>,
1734 ) -> BTreeSet<CatalogItemId> {
1735 let mut missing = BTreeSet::new();
1736 for write in writes {
1737 if !self.locks.contains_key(&write) {
1738 missing.insert(write);
1739 }
1740 }
1741 missing
1742 }
1743}
1744
1745impl Drop for GroupCommitWriteLocks {
1746 fn drop(&mut self) {
1747 if !self.locks.is_empty() {
1748 tracing::info!(
1749 locks = ?self.locks,
1750 "dropping group commit write locks",
1751 );
1752 }
1753 }
1754}