1use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Display, Formatter};
13use std::future::Future;
14use std::pin::{self, Pin};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use chrono::{DateTime, Utc};
20use derivative::Derivative;
21use futures::{Stream, StreamExt};
22use itertools::Itertools;
23use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
24use mz_auth::Authenticated;
25use mz_auth::password::Password;
26use mz_build_info::BuildInfo;
27use mz_compute_types::ComputeInstanceId;
28use mz_ore::channel::OneshotReceiverExt;
29use mz_ore::collections::CollectionExt;
30use mz_ore::id_gen::{IdAllocator, IdAllocatorInnerBitSet, MAX_ORG_ID, org_id_conn_bits};
31use mz_ore::instrument;
32use mz_ore::now::{EpochMillis, NowFn, to_datetime};
33use mz_ore::result::ResultExt;
34use mz_ore::task::AbortOnDropHandle;
35use mz_ore::thread::JoinOnDropHandle;
36use mz_ore::tracing::OpenTelemetryContext;
37use mz_repr::user::InternalUserMetadata;
38use mz_repr::{CatalogItemId, ColumnIndex, Row, SqlScalarType};
39use mz_sql::ast::{Raw, Statement};
40use mz_sql::catalog::{EnvironmentId, SessionCatalog};
41use mz_sql::session::hint::ApplicationNameHint;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::user::SUPPORT_USER;
44use mz_sql::session::vars::{
45 CLUSTER, ENABLE_FRONTEND_PEEK_SEQUENCING, OwnedVarInput, SystemVars, Var,
46};
47use mz_sql_parser::parser::{ParserStatementError, StatementParseResult};
48use prometheus::Histogram;
49use serde_json::json;
50use tokio::sync::{mpsc, oneshot};
51use tracing::{debug, error};
52use uuid::Uuid;
53
54use crate::catalog::Catalog;
55use crate::command::{
56 CatalogDump, CatalogSnapshot, Command, ExecuteResponse, Response, SASLChallengeResponse,
57 SASLVerifyProofResponse, SuperuserAttribute,
58};
59use crate::coord::{Coordinator, ExecuteContextGuard};
60use crate::error::AdapterError;
61use crate::metrics::Metrics;
62use crate::optimize::dataflows::{EvalTime, ExprPrepOneShot};
63use crate::optimize::{self, Optimize};
64use crate::session::{
65 EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId,
66};
67use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
68use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType};
69use crate::webhook::AppendWebhookResponse;
70use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse};
71
72pub struct Handle {
78 pub(crate) session_id: Uuid,
79 pub(crate) start_instant: Instant,
80 pub(crate) _thread: JoinOnDropHandle<()>,
81}
82
83impl Handle {
84 pub fn session_id(&self) -> Uuid {
90 self.session_id
91 }
92
93 pub fn start_instant(&self) -> Instant {
95 self.start_instant
96 }
97}
98
99#[derive(Debug, Clone)]
107pub struct Client {
108 build_info: &'static BuildInfo,
109 inner_cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
110 id_alloc: IdAllocator<IdAllocatorInnerBitSet>,
111 now: NowFn,
112 metrics: Metrics,
113 environment_id: EnvironmentId,
114 segment_client: Option<mz_segment::Client>,
115}
116
117impl Client {
118 pub(crate) fn new(
119 build_info: &'static BuildInfo,
120 cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
121 metrics: Metrics,
122 now: NowFn,
123 environment_id: EnvironmentId,
124 segment_client: Option<mz_segment::Client>,
125 ) -> Client {
126 let env_lower = org_id_conn_bits(&environment_id.organization_id());
134 Client {
135 build_info,
136 inner_cmd_tx: cmd_tx,
137 id_alloc: IdAllocator::new(1, MAX_ORG_ID, env_lower),
138 now,
139 metrics,
140 environment_id,
141 segment_client,
142 }
143 }
144
145 pub fn new_conn_id(&self) -> Result<ConnectionId, AdapterError> {
147 self.id_alloc.alloc().ok_or(AdapterError::IdExhaustionError)
148 }
149
150 pub fn new_session(&self, config: SessionConfig, _authenticated: Authenticated) -> Session {
156 Session::new(self.build_info, config, self.metrics().session_metrics())
160 }
161
162 pub async fn authenticate(
165 &self,
166 user: &String,
167 password: &Password,
168 ) -> Result<Authenticated, AdapterError> {
169 let (tx, rx) = oneshot::channel();
170 self.send(Command::AuthenticatePassword {
171 role_name: user.to_string(),
172 password: Some(password.clone()),
173 tx,
174 });
175 rx.await.expect("sender dropped")?;
176 Ok(Authenticated)
177 }
178
179 pub async fn generate_sasl_challenge(
180 &self,
181 user: &String,
182 client_nonce: &String,
183 ) -> Result<SASLChallengeResponse, AdapterError> {
184 let (tx, rx) = oneshot::channel();
185 self.send(Command::AuthenticateGetSASLChallenge {
186 role_name: user.to_string(),
187 nonce: client_nonce.to_string(),
188 tx,
189 });
190 let response = rx.await.expect("sender dropped")?;
191 Ok(response)
192 }
193
194 pub async fn verify_sasl_proof(
195 &self,
196 user: &String,
197 proof: &String,
198 nonce: &String,
199 mock_hash: &String,
200 ) -> Result<(SASLVerifyProofResponse, Authenticated), AdapterError> {
201 let (tx, rx) = oneshot::channel();
202 self.send(Command::AuthenticateVerifySASLProof {
203 role_name: user.to_string(),
204 proof: proof.to_string(),
205 auth_message: nonce.to_string(),
206 mock_hash: mock_hash.to_string(),
207 tx,
208 });
209 let response = rx.await.expect("sender dropped")?;
210 Ok((response, Authenticated))
211 }
212
213 #[mz_ore::instrument(level = "debug")]
222 pub async fn startup(&self, session: Session) -> Result<SessionClient, AdapterError> {
223 let user = session.user().clone();
224 let conn_id = session.conn_id().clone();
225 let secret_key = session.secret_key();
226 let uuid = session.uuid();
227 let client_ip = session.client_ip();
228 let application_name = session.application_name().into();
229 let notice_tx = session.retain_notice_transmitter();
230
231 let (tx, rx) = oneshot::channel();
232
233 let rx = rx.with_guard(|_| {
239 self.send(Command::Terminate {
240 conn_id: conn_id.clone(),
241 tx: None,
242 });
243 });
244
245 self.send(Command::Startup {
246 tx,
247 user,
248 conn_id: conn_id.clone(),
249 secret_key,
250 uuid,
251 client_ip: client_ip.copied(),
252 application_name,
253 notice_tx,
254 });
255
256 let response = rx.await.expect("sender dropped")?;
259
260 let StartupResponse {
264 role_id,
265 write_notify,
266 session_defaults,
267 catalog,
268 storage_collections,
269 transient_id_gen,
270 optimizer_metrics,
271 persist_client,
272 statement_logging_frontend,
273 superuser_attribute,
274 } = response;
275
276 let peek_client = PeekClient::new(
277 self.clone(),
278 storage_collections,
279 transient_id_gen,
280 optimizer_metrics,
281 persist_client,
282 statement_logging_frontend,
283 );
284
285 let mut client = SessionClient {
286 inner: Some(self.clone()),
287 session: Some(session),
288 timeouts: Timeout::new(),
289 environment_id: self.environment_id.clone(),
290 segment_client: self.segment_client.clone(),
291 peek_client,
292 enable_frontend_peek_sequencing: false, };
294
295 let session = client.session();
296
297 if let SuperuserAttribute(Some(superuser)) = superuser_attribute {
300 session.apply_internal_user_metadata(InternalUserMetadata { superuser });
301 }
302
303 session.initialize_role_metadata(role_id);
304 let vars_mut = session.vars_mut();
305 for (name, val) in session_defaults {
306 if let Err(err) = vars_mut.set_default(&name, val.borrow()) {
307 tracing::error!("failed to set peristed default, {err:?}");
310 }
311 }
312 session
313 .vars_mut()
314 .end_transaction(EndTransactionAction::Commit);
315
316 session.set_builtin_table_updates(write_notify);
324
325 let catalog = catalog.for_session(session);
326
327 let cluster_active = session.vars().cluster().to_string();
328 if session.vars().welcome_message() {
329 let cluster_info = if catalog.resolve_cluster(Some(&cluster_active)).is_err() {
330 format!("{cluster_active} (does not exist)")
331 } else {
332 cluster_active.to_string()
333 };
334
335 session.add_notice(AdapterNotice::Welcome(format!(
339 "connected to Materialize v{}
340 Org ID: {}
341 Region: {}
342 User: {}
343 Cluster: {}
344 Database: {}
345 {}
346 Session UUID: {}
347
348Issue a SQL query to get started. Need help?
349 View documentation: https://materialize.com/s/docs
350 Join our Slack community: https://materialize.com/s/chat
351 ",
352 session.vars().build_info().semver_version(),
353 self.environment_id.organization_id(),
354 self.environment_id.region(),
355 session.vars().user().name,
356 cluster_info,
357 session.vars().database(),
358 match session.vars().search_path() {
359 [schema] => format!("Schema: {}", schema),
360 schemas => format!(
361 "Search path: {}",
362 schemas.iter().map(|id| id.to_string()).join(", ")
363 ),
364 },
365 session.uuid(),
366 )));
367 }
368
369 if session.vars().current_object_missing_warnings() {
370 if catalog.active_database().is_none() {
371 let db = session.vars().database().into();
372 session.add_notice(AdapterNotice::UnknownSessionDatabase(db));
373 }
374 }
375
376 let cluster_var = session
379 .vars()
380 .inspect(CLUSTER.name())
381 .expect("cluster should exist");
382 if session.vars().current_object_missing_warnings()
383 && catalog.resolve_cluster(Some(&cluster_active)).is_err()
384 {
385 let cluster_notice = 'notice: {
386 if cluster_var.inspect_session_value().is_some() {
387 break 'notice Some(AdapterNotice::DefaultClusterDoesNotExist {
388 name: cluster_active,
389 kind: "session",
390 suggested_action: "Pick an extant cluster with SET CLUSTER = name. Run SHOW CLUSTERS to see available clusters.".into(),
391 });
392 }
393
394 let role_default = catalog.get_role(catalog.active_role_id());
395 let role_cluster = match role_default.vars().get(CLUSTER.name()) {
396 Some(OwnedVarInput::Flat(name)) => Some(name),
397 None => None,
398 Some(v @ OwnedVarInput::SqlSet(_)) => {
400 tracing::warn!(?v, "SqlSet found for cluster Role Default");
401 break 'notice None;
402 }
403 };
404
405 let alter_role = "with `ALTER ROLE <role> SET cluster TO <cluster>;`";
406 match role_cluster {
407 None => Some(AdapterNotice::DefaultClusterDoesNotExist {
409 name: cluster_active,
410 kind: "system",
411 suggested_action: format!(
412 "Set a default cluster for the current role {alter_role}."
413 ),
414 }),
415 Some(_) => Some(AdapterNotice::DefaultClusterDoesNotExist {
417 name: cluster_active,
418 kind: "role",
419 suggested_action: format!(
420 "Change the default cluster for the current role {alter_role}."
421 ),
422 }),
423 }
424 };
425
426 if let Some(notice) = cluster_notice {
427 session.add_notice(notice);
428 }
429 }
430
431 client.enable_frontend_peek_sequencing = ENABLE_FRONTEND_PEEK_SEQUENCING
432 .require(catalog.system_vars())
433 .is_ok();
434
435 Ok(client)
436 }
437
438 pub fn cancel_request(&self, conn_id: ConnectionIdType, secret_key: u32) {
440 self.send(Command::CancelRequest {
441 conn_id,
442 secret_key,
443 });
444 }
445
446 pub async fn support_execute_one(
449 &self,
450 sql: &str,
451 ) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
452 let conn_id = self.new_conn_id()?;
454 let session = self.new_session(
455 SessionConfig {
456 conn_id,
457 uuid: Uuid::new_v4(),
458 user: SUPPORT_USER.name.clone(),
459 client_ip: None,
460 external_metadata_rx: None,
461 helm_chart_version: None,
462 },
463 Authenticated,
464 );
465 let mut session_client = self.startup(session).await?;
466
467 let stmts = mz_sql::parse::parse(sql)?;
469 if stmts.len() != 1 {
470 bail!("must supply exactly one query");
471 }
472 let StatementParseResult { ast: stmt, sql } = stmts.into_element();
473
474 const EMPTY_PORTAL: &str = "";
475 session_client.start_transaction(Some(1))?;
476 session_client
477 .declare(EMPTY_PORTAL.into(), stmt, sql.to_string())
478 .await?;
479
480 match session_client
481 .execute(EMPTY_PORTAL.into(), futures::future::pending(), None)
482 .await?
483 {
484 (ExecuteResponse::SendingRowsStreaming { mut rows, .. }, _) => {
485 let owning_response_stream = async_stream::stream! {
490 while let Some(rows) = rows.next().await {
491 yield rows;
492 }
493 drop(session_client);
494 };
495 Ok(Box::pin(owning_response_stream))
496 }
497 r => bail!("unsupported response type: {r:?}"),
498 }
499 }
500
501 pub fn metrics(&self) -> &Metrics {
503 &self.metrics
504 }
505
506 pub fn now(&self) -> DateTime<Utc> {
508 to_datetime((self.now)())
509 }
510
511 pub async fn get_webhook_appender(
513 &self,
514 database: String,
515 schema: String,
516 name: String,
517 ) -> Result<AppendWebhookResponse, AppendWebhookError> {
518 let (tx, rx) = oneshot::channel();
519
520 self.send(Command::GetWebhook {
522 database,
523 schema,
524 name,
525 tx,
526 });
527
528 let response = rx
530 .await
531 .map_err(|_| anyhow::anyhow!("failed to receive webhook response"))?;
532
533 response
534 }
535
536 pub async fn get_system_vars(&self) -> SystemVars {
538 let (tx, rx) = oneshot::channel();
539 self.send(Command::GetSystemVars { tx });
540 rx.await.expect("coordinator unexpectedly gone")
541 }
542
543 #[instrument(level = "debug")]
544 pub(crate) fn send(&self, cmd: Command) {
545 self.inner_cmd_tx
546 .send((OpenTelemetryContext::obtain(), cmd))
547 .expect("coordinator unexpectedly gone");
548 }
549}
550
551pub struct SessionClient {
555 inner: Option<Client>,
559 session: Option<Session>,
562 timeouts: Timeout,
563 segment_client: Option<mz_segment::Client>,
564 environment_id: EnvironmentId,
565 peek_client: PeekClient,
567 pub enable_frontend_peek_sequencing: bool,
572}
573
574impl SessionClient {
575 pub fn parse<'a>(
578 &self,
579 sql: &'a str,
580 ) -> Result<Result<Vec<StatementParseResult<'a>>, ParserStatementError>, String> {
581 match mz_sql::parse::parse_with_limit(sql) {
582 Ok(Err(e)) => {
583 self.track_statement_parse_failure(&e);
584 Ok(Err(e))
585 }
586 r => r,
587 }
588 }
589
590 fn track_statement_parse_failure(&self, parse_error: &ParserStatementError) {
591 let session = self.session.as_ref().expect("session invariant violated");
592 let Some(user_id) = session.user().external_metadata.as_ref().map(|m| m.user_id) else {
593 return;
594 };
595 let Some(segment_client) = &self.segment_client else {
596 return;
597 };
598 let Some(statement_kind) = parse_error.statement else {
599 return;
600 };
601 let Some((action, object_type)) = telemetry::analyze_audited_statement(statement_kind)
602 else {
603 return;
604 };
605 let event_type = StatementFailureType::ParseFailure;
606 let event_name = format!(
607 "{} {} {}",
608 object_type.as_title_case(),
609 action.as_title_case(),
610 event_type.as_title_case(),
611 );
612 segment_client.environment_track(
613 &self.environment_id,
614 event_name,
615 json!({
616 "statement_kind": statement_kind,
617 "error": &parse_error.error,
618 }),
619 EventDetails {
620 user_id: Some(user_id),
621 application_name: Some(session.application_name()),
622 ..Default::default()
623 },
624 );
625 }
626
627 pub async fn get_prepared_statement(
630 &mut self,
631 name: &str,
632 ) -> Result<&PreparedStatement, AdapterError> {
633 let catalog = self.catalog_snapshot("get_prepared_statement").await;
634 Coordinator::verify_prepared_statement(&catalog, self.session(), name)?;
635 Ok(self
636 .session()
637 .get_prepared_statement_unverified(name)
638 .expect("must exist"))
639 }
640
641 pub async fn prepare(
646 &mut self,
647 name: String,
648 stmt: Option<Statement<Raw>>,
649 sql: String,
650 param_types: Vec<Option<SqlScalarType>>,
651 ) -> Result<(), AdapterError> {
652 let catalog = self.catalog_snapshot("prepare").await;
653
654 let mut async_pause = false;
657 (|| {
658 fail::fail_point!("async_prepare", |val| {
659 async_pause = val.map_or(false, |val| val.parse().unwrap_or(false))
660 });
661 })();
662 if async_pause {
663 tokio::time::sleep(Duration::from_secs(1)).await;
664 };
665
666 let desc = Coordinator::describe(&catalog, self.session(), stmt.clone(), param_types)?;
667 let now = self.now();
668 let state_revision = StateRevision {
669 catalog_revision: catalog.transient_revision(),
670 session_state_revision: self.session().state_revision(),
671 };
672 self.session()
673 .set_prepared_statement(name, stmt, sql, desc, state_revision, now);
674 Ok(())
675 }
676
677 #[mz_ore::instrument(level = "debug")]
679 pub async fn declare(
680 &mut self,
681 name: String,
682 stmt: Statement<Raw>,
683 sql: String,
684 ) -> Result<(), AdapterError> {
685 let catalog = self.catalog_snapshot("declare").await;
686 let param_types = vec![];
687 let desc =
688 Coordinator::describe(&catalog, self.session(), Some(stmt.clone()), param_types)?;
689 let params = vec![];
690 let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
691 let now = self.now();
692 let logging = self.session().mint_logging(sql, Some(&stmt), now);
693 let state_revision = StateRevision {
694 catalog_revision: catalog.transient_revision(),
695 session_state_revision: self.session().state_revision(),
696 };
697 self.session().set_portal(
698 name,
699 desc,
700 Some(stmt),
701 logging,
702 params,
703 result_formats,
704 state_revision,
705 )?;
706 Ok(())
707 }
708
709 #[mz_ore::instrument(level = "debug")]
716 pub async fn execute(
717 &mut self,
718 portal_name: String,
719 cancel_future: impl Future<Output = std::io::Error> + Send,
720 outer_ctx_extra: Option<ExecuteContextGuard>,
721 ) -> Result<(ExecuteResponse, Instant), AdapterError> {
722 let execute_started = Instant::now();
723
724 let mut outer_ctx_extra = outer_ctx_extra;
728 if let Some(resp) = self
729 .try_frontend_peek(&portal_name, &mut outer_ctx_extra)
730 .await?
731 {
732 debug!("frontend peek succeeded");
733 return Ok((resp, execute_started));
736 } else {
737 debug!("frontend peek did not happen, falling back to `Command::Execute`");
738 }
743
744 let response = self
745 .send_with_cancel(
746 |tx, session| Command::Execute {
747 portal_name,
748 session,
749 tx,
750 outer_ctx_extra,
751 },
752 cancel_future,
753 )
754 .await?;
755 Ok((response, execute_started))
756 }
757
758 fn now(&self) -> EpochMillis {
759 (self.inner().now)()
760 }
761
762 fn now_datetime(&self) -> DateTime<Utc> {
763 to_datetime(self.now())
764 }
765
766 pub fn start_transaction(&mut self, implicit: Option<usize>) -> Result<(), AdapterError> {
772 let now = self.now_datetime();
773 let session = self.session.as_mut().expect("session invariant violated");
774 let result = match implicit {
775 None => session.start_transaction(now, None, None),
776 Some(stmts) => {
777 session.start_transaction_implicit(now, stmts);
778 Ok(())
779 }
780 };
781 result
782 }
783
784 #[instrument(level = "debug")]
787 pub async fn end_transaction(
788 &mut self,
789 action: EndTransactionAction,
790 ) -> Result<ExecuteResponse, AdapterError> {
791 let res = self
792 .send(|tx, session| Command::Commit {
793 action,
794 session,
795 tx,
796 })
797 .await;
798 let _ = self.session().clear_transaction();
802 res
803 }
804
805 pub fn fail_transaction(&mut self) {
807 let session = self.session.take().expect("session invariant violated");
808 let session = session.fail_transaction();
809 self.session = Some(session);
810 }
811
812 #[instrument(level = "debug")]
814 pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
815 let start = std::time::Instant::now();
816 let CatalogSnapshot { catalog } = self
817 .send_without_session(|tx| Command::CatalogSnapshot { tx })
818 .await;
819 self.inner()
820 .metrics()
821 .catalog_snapshot_seconds
822 .with_label_values(&[context])
823 .observe(start.elapsed().as_secs_f64());
824 catalog
825 }
826
827 pub async fn dump_catalog(&self) -> Result<CatalogDump, AdapterError> {
832 let catalog = self.catalog_snapshot("dump_catalog").await;
833 catalog.dump().map_err(AdapterError::from)
834 }
835
836 pub async fn check_catalog(&self) -> Result<(), serde_json::Value> {
842 let catalog = self.catalog_snapshot("check_catalog").await;
843 catalog.check_consistency()
844 }
845
846 pub async fn check_coordinator(&self) -> Result<(), serde_json::Value> {
852 self.send_without_session(|tx| Command::CheckConsistency { tx })
853 .await
854 .map_err(|inconsistencies| {
855 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
856 serde_json::Value::String("failed to serialize inconsistencies".to_string())
857 })
858 })
859 }
860
861 pub async fn dump_coordinator_state(&self) -> Result<serde_json::Value, anyhow::Error> {
862 self.send_without_session(|tx| Command::Dump { tx }).await
863 }
864
865 pub fn retire_execute(
868 &self,
869 guard: ExecuteContextGuard,
870 reason: StatementEndedExecutionReason,
871 ) {
872 if !guard.is_trivial() {
873 let data = guard.defuse();
874 let cmd = Command::RetireExecute { data, reason };
875 self.inner().send(cmd);
876 }
877 }
878
879 pub async fn insert_rows(
885 &mut self,
886 target_id: CatalogItemId,
887 target_name: String,
888 columns: Vec<ColumnIndex>,
889 rows: Vec<Row>,
890 ctx_extra: ExecuteContextGuard,
891 ) -> Result<ExecuteResponse, AdapterError> {
892 let pcx = self.session().pcx().clone();
895
896 let session_meta = self.session().meta();
897
898 let catalog = self.catalog_snapshot("insert_rows").await;
899 let conn_catalog = catalog.for_session(self.session());
900 let catalog_state = conn_catalog.state();
901
902 let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
904 let prep = ExprPrepOneShot {
905 logical_time: EvalTime::NotAvailable,
906 session: &session_meta,
907 catalog_state,
908 };
909 let mut optimizer =
910 optimize::view::Optimizer::new_with_prep_no_limit(optimizer_config.clone(), None, prep);
911
912 let result: Result<_, AdapterError> = mz_sql::plan::plan_copy_from(
913 &pcx,
914 &conn_catalog,
915 target_id,
916 target_name,
917 columns,
918 rows,
919 )
920 .err_into()
921 .and_then(|values| optimizer.optimize(values).err_into())
922 .and_then(|values| {
923 Coordinator::insert_constant(&catalog, self.session(), target_id, values.into_inner())
925 });
926 self.retire_execute(ctx_extra, (&result).into());
927 result
928 }
929
930 pub async fn get_system_vars(&self) -> SystemVars {
932 self.inner().get_system_vars().await
933 }
934
935 pub async fn set_system_vars(
937 &mut self,
938 vars: BTreeMap<String, String>,
939 ) -> Result<(), AdapterError> {
940 let conn_id = self.session().conn_id().clone();
941 self.send_without_session(|tx| Command::SetSystemVars { vars, conn_id, tx })
942 .await
943 }
944
945 pub async fn terminate(&mut self) {
947 let conn_id = self.session().conn_id().clone();
948 let res = self
949 .send_without_session(|tx| Command::Terminate {
950 conn_id,
951 tx: Some(tx),
952 })
953 .await;
954 if let Err(e) = res {
955 error!("Unable to terminate session: {e:?}");
957 }
958 self.inner = None;
960 }
961
962 pub fn session(&mut self) -> &mut Session {
964 self.session.as_mut().expect("session invariant violated")
965 }
966
967 pub fn inner(&self) -> &Client {
969 self.inner.as_ref().expect("inner invariant violated")
970 }
971
972 async fn send_without_session<T, F>(&self, f: F) -> T
973 where
974 F: FnOnce(oneshot::Sender<T>) -> Command,
975 {
976 let (tx, rx) = oneshot::channel();
977 self.inner().send(f(tx));
978 rx.await.expect("sender dropped")
979 }
980
981 #[instrument(level = "debug")]
982 async fn send<T, F>(&mut self, f: F) -> Result<T, AdapterError>
983 where
984 F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
985 {
986 self.send_with_cancel(f, futures::future::pending()).await
987 }
988
989 #[instrument(level = "debug")]
993 async fn send_with_cancel<T, F>(
994 &mut self,
995 f: F,
996 cancel_future: impl Future<Output = std::io::Error> + Send,
997 ) -> Result<T, AdapterError>
998 where
999 F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1000 {
1001 let session = self.session.take().expect("session invariant violated");
1002 let mut typ = None;
1003 let application_name = session.application_name();
1004 let name_hint = ApplicationNameHint::from_str(application_name);
1005 let conn_id = session.conn_id().clone();
1006 let (tx, rx) = oneshot::channel();
1007
1008 let Self {
1011 inner: inner_client,
1012 session: client_session,
1013 ..
1014 } = self;
1015
1016 let inner_client = inner_client.as_ref().expect("inner invariant violated");
1019
1020 let mut guarded_rx = rx.with_guard(|response: Response<_>| {
1026 *client_session = Some(response.session);
1027 });
1028
1029 inner_client.send({
1030 let cmd = f(tx, session);
1031 match cmd {
1035 Command::Execute { .. } => typ = Some("execute"),
1036 Command::GetWebhook { .. } => typ = Some("webhook"),
1037 Command::Startup { .. }
1038 | Command::AuthenticatePassword { .. }
1039 | Command::AuthenticateGetSASLChallenge { .. }
1040 | Command::AuthenticateVerifySASLProof { .. }
1041 | Command::CatalogSnapshot { .. }
1042 | Command::Commit { .. }
1043 | Command::CancelRequest { .. }
1044 | Command::PrivilegedCancelRequest { .. }
1045 | Command::GetSystemVars { .. }
1046 | Command::SetSystemVars { .. }
1047 | Command::Terminate { .. }
1048 | Command::RetireExecute { .. }
1049 | Command::CheckConsistency { .. }
1050 | Command::Dump { .. }
1051 | Command::GetComputeInstanceClient { .. }
1052 | Command::GetOracle { .. }
1053 | Command::DetermineRealTimeRecentTimestamp { .. }
1054 | Command::GetTransactionReadHoldsBundle { .. }
1055 | Command::StoreTransactionReadHolds { .. }
1056 | Command::ExecuteSlowPathPeek { .. }
1057 | Command::CopyToPreflight { .. }
1058 | Command::ExecuteCopyTo { .. }
1059 | Command::ExecuteSideEffectingFunc { .. }
1060 | Command::RegisterFrontendPeek { .. }
1061 | Command::UnregisterFrontendPeek { .. }
1062 | Command::ExplainTimestamp { .. }
1063 | Command::FrontendStatementLogging(..) => {}
1064 };
1065 cmd
1066 });
1067
1068 let mut cancel_future = pin::pin!(cancel_future);
1069 let mut cancelled = false;
1070 loop {
1071 tokio::select! {
1072 res = &mut guarded_rx => {
1073 drop(guarded_rx);
1075
1076 let res = res.expect("sender dropped");
1077 let status = res.result.is_ok().then_some("success").unwrap_or("error");
1078 if let Err(err) = res.result.as_ref() {
1079 if name_hint.should_trace_errors() {
1080 tracing::warn!(?err, ?name_hint, "adapter response error");
1081 }
1082 }
1083
1084 if let Some(typ) = typ {
1085 inner_client
1086 .metrics
1087 .commands
1088 .with_label_values(&[typ, status, name_hint.as_str()])
1089 .inc();
1090 }
1091 *client_session = Some(res.session);
1092 return res.result;
1093 },
1094 _err = &mut cancel_future, if !cancelled => {
1095 cancelled = true;
1096 inner_client.send(Command::PrivilegedCancelRequest {
1097 conn_id: conn_id.clone(),
1098 });
1099 }
1100 };
1101 }
1102 }
1103
1104 pub fn add_idle_in_transaction_session_timeout(&mut self) {
1105 let session = self.session();
1106 let timeout_dur = session.vars().idle_in_transaction_session_timeout();
1107 if !timeout_dur.is_zero() {
1108 let timeout_dur = timeout_dur.clone();
1109 if let Some(txn) = session.transaction().inner() {
1110 let txn_id = txn.id.clone();
1111 let timeout = TimeoutType::IdleInTransactionSession(txn_id);
1112 self.timeouts.add_timeout(timeout, timeout_dur);
1113 }
1114 }
1115 }
1116
1117 pub fn remove_idle_in_transaction_session_timeout(&mut self) {
1118 let session = self.session();
1119 if let Some(txn) = session.transaction().inner() {
1120 let txn_id = txn.id.clone();
1121 self.timeouts
1122 .remove_timeout(&TimeoutType::IdleInTransactionSession(txn_id));
1123 }
1124 }
1125
1126 pub async fn recv_timeout(&mut self) -> Option<TimeoutType> {
1133 self.timeouts.recv().await
1134 }
1135
1136 pub fn peek_client(&self) -> &PeekClient {
1138 &self.peek_client
1139 }
1140
1141 pub fn peek_client_mut(&mut self) -> &mut PeekClient {
1143 &mut self.peek_client
1144 }
1145
1146 pub(crate) async fn try_frontend_peek(
1154 &mut self,
1155 portal_name: &str,
1156 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
1157 ) -> Result<Option<ExecuteResponse>, AdapterError> {
1158 if self.enable_frontend_peek_sequencing {
1159 let session = self.session.as_mut().expect("SessionClient invariant");
1160 self.peek_client
1161 .try_frontend_peek(portal_name, session, outer_ctx_extra)
1162 .await
1163 } else {
1164 Ok(None)
1165 }
1166 }
1167}
1168
1169impl Drop for SessionClient {
1170 fn drop(&mut self) {
1171 if let Some(session) = self.session.take() {
1175 if let Some(inner) = &self.inner {
1178 inner.send(Command::Terminate {
1179 conn_id: session.conn_id().clone(),
1180 tx: None,
1181 })
1182 }
1183 }
1184 }
1185}
1186
1187#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
1188pub enum TimeoutType {
1189 IdleInTransactionSession(TransactionId),
1190}
1191
1192impl Display for TimeoutType {
1193 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1194 match self {
1195 TimeoutType::IdleInTransactionSession(txn_id) => {
1196 writeln!(f, "Idle in transaction session for transaction '{txn_id}'")
1197 }
1198 }
1199 }
1200}
1201
1202impl From<TimeoutType> for AdapterError {
1203 fn from(timeout: TimeoutType) -> Self {
1204 match timeout {
1205 TimeoutType::IdleInTransactionSession(_) => {
1206 AdapterError::IdleInTransactionSessionTimeout
1207 }
1208 }
1209 }
1210}
1211
1212struct Timeout {
1213 tx: mpsc::UnboundedSender<TimeoutType>,
1214 rx: mpsc::UnboundedReceiver<TimeoutType>,
1215 active_timeouts: BTreeMap<TimeoutType, AbortOnDropHandle<()>>,
1216}
1217
1218impl Timeout {
1219 fn new() -> Self {
1220 let (tx, rx) = mpsc::unbounded_channel();
1221 Timeout {
1222 tx,
1223 rx,
1224 active_timeouts: BTreeMap::new(),
1225 }
1226 }
1227
1228 async fn recv(&mut self) -> Option<TimeoutType> {
1237 self.rx.recv().await
1238 }
1239
1240 fn add_timeout(&mut self, timeout: TimeoutType, duration: Duration) {
1241 let tx = self.tx.clone();
1242 let timeout_key = timeout.clone();
1243 let handle = mz_ore::task::spawn(|| format!("{timeout_key}"), async move {
1244 tokio::time::sleep(duration).await;
1245 let _ = tx.send(timeout);
1246 })
1247 .abort_on_drop();
1248 self.active_timeouts.insert(timeout_key, handle);
1249 }
1250
1251 fn remove_timeout(&mut self, timeout: &TimeoutType) {
1252 self.active_timeouts.remove(timeout);
1253
1254 let mut timeouts = Vec::new();
1256 while let Ok(pending_timeout) = self.rx.try_recv() {
1257 if timeout != &pending_timeout {
1258 timeouts.push(pending_timeout);
1259 }
1260 }
1261 for pending_timeout in timeouts {
1262 self.tx.send(pending_timeout).expect("rx is in this struct");
1263 }
1264 }
1265}
1266
1267#[derive(Derivative)]
1271#[derivative(Debug)]
1272pub struct RecordFirstRowStream {
1273 #[derivative(Debug = "ignore")]
1275 pub rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1276 pub execute_started: Instant,
1278 pub time_to_first_row_seconds: Histogram,
1281 pub saw_rows: bool,
1283 pub recorded_first_row_instant: Option<Instant>,
1285 pub no_more_rows: bool,
1287}
1288
1289impl RecordFirstRowStream {
1290 pub fn new(
1292 rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1293 execute_started: Instant,
1294 client: &SessionClient,
1295 instance_id: Option<ComputeInstanceId>,
1296 strategy: Option<StatementExecutionStrategy>,
1297 ) -> Self {
1298 let histogram = Self::histogram(client, instance_id, strategy);
1299 Self {
1300 rows,
1301 execute_started,
1302 time_to_first_row_seconds: histogram,
1303 saw_rows: false,
1304 recorded_first_row_instant: None,
1305 no_more_rows: false,
1306 }
1307 }
1308
1309 fn histogram(
1310 client: &SessionClient,
1311 instance_id: Option<ComputeInstanceId>,
1312 strategy: Option<StatementExecutionStrategy>,
1313 ) -> Histogram {
1314 let isolation_level = *client
1315 .session
1316 .as_ref()
1317 .expect("session invariant")
1318 .vars()
1319 .transaction_isolation();
1320 let instance = match instance_id {
1321 Some(i) => Cow::Owned(i.to_string()),
1322 None => Cow::Borrowed("none"),
1323 };
1324 let strategy = match strategy {
1325 Some(s) => s.name(),
1326 None => "none",
1327 };
1328
1329 client
1330 .inner()
1331 .metrics()
1332 .time_to_first_row_seconds
1333 .with_label_values(&[instance.as_ref(), isolation_level.as_str(), strategy])
1334 }
1335
1336 pub fn record(
1339 execute_started: Instant,
1340 client: &SessionClient,
1341 instance_id: Option<ComputeInstanceId>,
1342 strategy: Option<StatementExecutionStrategy>,
1343 ) {
1344 Self::histogram(client, instance_id, strategy)
1345 .observe(execute_started.elapsed().as_secs_f64());
1346 }
1347
1348 pub async fn recv(&mut self) -> Option<PeekResponseUnary> {
1349 let msg = self.rows.next().await;
1350 if !self.saw_rows && matches!(msg, Some(PeekResponseUnary::Rows(_))) {
1351 self.saw_rows = true;
1352 self.time_to_first_row_seconds
1353 .observe(self.execute_started.elapsed().as_secs_f64());
1354 self.recorded_first_row_instant = Some(Instant::now());
1355 }
1356 if msg.is_none() {
1357 self.no_more_rows = true;
1358 }
1359 msg
1360 }
1361}