1use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Display, Formatter};
13use std::future::Future;
14use std::pin::{self, Pin};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use chrono::{DateTime, Utc};
20use derivative::Derivative;
21use futures::{Stream, StreamExt};
22use itertools::Itertools;
23use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
24use mz_auth::password::Password;
25use mz_build_info::BuildInfo;
26use mz_compute_types::ComputeInstanceId;
27use mz_ore::channel::OneshotReceiverExt;
28use mz_ore::collections::CollectionExt;
29use mz_ore::id_gen::{IdAllocator, IdAllocatorInnerBitSet, MAX_ORG_ID, org_id_conn_bits};
30use mz_ore::instrument;
31use mz_ore::now::{EpochMillis, NowFn, to_datetime};
32use mz_ore::result::ResultExt;
33use mz_ore::task::AbortOnDropHandle;
34use mz_ore::thread::JoinOnDropHandle;
35use mz_ore::tracing::OpenTelemetryContext;
36use mz_repr::{CatalogItemId, ColumnIndex, Row, SqlScalarType};
37use mz_sql::ast::{Raw, Statement};
38use mz_sql::catalog::{EnvironmentId, SessionCatalog};
39use mz_sql::session::hint::ApplicationNameHint;
40use mz_sql::session::metadata::SessionMetadata;
41use mz_sql::session::user::SUPPORT_USER;
42use mz_sql::session::vars::{
43 CLUSTER, ENABLE_FRONTEND_PEEK_SEQUENCING, OwnedVarInput, SystemVars, Var,
44};
45use mz_sql_parser::parser::{ParserStatementError, StatementParseResult};
46use prometheus::Histogram;
47use serde_json::json;
48use tokio::sync::{mpsc, oneshot};
49use tracing::{debug, error};
50use uuid::Uuid;
51
52use crate::catalog::Catalog;
53use crate::command::{
54 AuthResponse, CatalogDump, CatalogSnapshot, Command, ExecuteResponse, Response,
55 SASLChallengeResponse, SASLVerifyProofResponse,
56};
57use crate::coord::{Coordinator, ExecuteContextGuard};
58use crate::error::AdapterError;
59use crate::metrics::Metrics;
60use crate::optimize::dataflows::{EvalTime, ExprPrepStyle};
61use crate::optimize::{self, Optimize};
62use crate::session::{
63 EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId,
64};
65use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
66use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType};
67use crate::webhook::AppendWebhookResponse;
68use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse};
69
70pub struct Handle {
76 pub(crate) session_id: Uuid,
77 pub(crate) start_instant: Instant,
78 pub(crate) _thread: JoinOnDropHandle<()>,
79}
80
81impl Handle {
82 pub fn session_id(&self) -> Uuid {
88 self.session_id
89 }
90
91 pub fn start_instant(&self) -> Instant {
93 self.start_instant
94 }
95}
96
97#[derive(Debug, Clone)]
105pub struct Client {
106 build_info: &'static BuildInfo,
107 inner_cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
108 id_alloc: IdAllocator<IdAllocatorInnerBitSet>,
109 now: NowFn,
110 metrics: Metrics,
111 environment_id: EnvironmentId,
112 segment_client: Option<mz_segment::Client>,
113}
114
115impl Client {
116 pub(crate) fn new(
117 build_info: &'static BuildInfo,
118 cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
119 metrics: Metrics,
120 now: NowFn,
121 environment_id: EnvironmentId,
122 segment_client: Option<mz_segment::Client>,
123 ) -> Client {
124 let env_lower = org_id_conn_bits(&environment_id.organization_id());
132 Client {
133 build_info,
134 inner_cmd_tx: cmd_tx,
135 id_alloc: IdAllocator::new(1, MAX_ORG_ID, env_lower),
136 now,
137 metrics,
138 environment_id,
139 segment_client,
140 }
141 }
142
143 pub fn new_conn_id(&self) -> Result<ConnectionId, AdapterError> {
145 self.id_alloc.alloc().ok_or(AdapterError::IdExhaustionError)
146 }
147
148 pub fn new_session(&self, config: SessionConfig) -> Session {
152 Session::new(self.build_info, config, self.metrics().session_metrics())
156 }
157
158 pub async fn authenticate(
160 &self,
161 user: &String,
162 password: &Password,
163 ) -> Result<AuthResponse, AdapterError> {
164 let (tx, rx) = oneshot::channel();
165 self.send(Command::AuthenticatePassword {
166 role_name: user.to_string(),
167 password: Some(password.clone()),
168 tx,
169 });
170 let response = rx.await.expect("sender dropped")?;
171 Ok(response)
172 }
173
174 pub async fn generate_sasl_challenge(
175 &self,
176 user: &String,
177 client_nonce: &String,
178 ) -> Result<SASLChallengeResponse, AdapterError> {
179 let (tx, rx) = oneshot::channel();
180 self.send(Command::AuthenticateGetSASLChallenge {
181 role_name: user.to_string(),
182 nonce: client_nonce.to_string(),
183 tx,
184 });
185 let response = rx.await.expect("sender dropped")?;
186 Ok(response)
187 }
188
189 pub async fn verify_sasl_proof(
190 &self,
191 user: &String,
192 proof: &String,
193 nonce: &String,
194 mock_hash: &String,
195 ) -> Result<SASLVerifyProofResponse, AdapterError> {
196 let (tx, rx) = oneshot::channel();
197 self.send(Command::AuthenticateVerifySASLProof {
198 role_name: user.to_string(),
199 proof: proof.to_string(),
200 auth_message: nonce.to_string(),
201 mock_hash: mock_hash.to_string(),
202 tx,
203 });
204 let response = rx.await.expect("sender dropped")?;
205 Ok(response)
206 }
207
208 #[mz_ore::instrument(level = "debug")]
217 pub async fn startup(&self, session: Session) -> Result<SessionClient, AdapterError> {
218 let user = session.user().clone();
219 let conn_id = session.conn_id().clone();
220 let secret_key = session.secret_key();
221 let uuid = session.uuid();
222 let client_ip = session.client_ip();
223 let application_name = session.application_name().into();
224 let notice_tx = session.retain_notice_transmitter();
225
226 let (tx, rx) = oneshot::channel();
227
228 let rx = rx.with_guard(|_| {
234 self.send(Command::Terminate {
235 conn_id: conn_id.clone(),
236 tx: None,
237 });
238 });
239
240 self.send(Command::Startup {
241 tx,
242 user,
243 conn_id: conn_id.clone(),
244 secret_key,
245 uuid,
246 client_ip: client_ip.copied(),
247 application_name,
248 notice_tx,
249 });
250
251 let response = rx.await.expect("sender dropped")?;
254
255 let StartupResponse {
259 role_id,
260 write_notify,
261 session_defaults,
262 catalog,
263 storage_collections,
264 transient_id_gen,
265 optimizer_metrics,
266 persist_client,
267 statement_logging_frontend,
268 } = response;
269
270 let peek_client = PeekClient::new(
271 self.clone(),
272 storage_collections,
273 transient_id_gen,
274 optimizer_metrics,
275 persist_client,
276 statement_logging_frontend,
277 );
278
279 let mut client = SessionClient {
280 inner: Some(self.clone()),
281 session: Some(session),
282 timeouts: Timeout::new(),
283 environment_id: self.environment_id.clone(),
284 segment_client: self.segment_client.clone(),
285 peek_client,
286 enable_frontend_peek_sequencing: false, };
288
289 let session = client.session();
290 session.initialize_role_metadata(role_id);
291 let vars_mut = session.vars_mut();
292 for (name, val) in session_defaults {
293 if let Err(err) = vars_mut.set_default(&name, val.borrow()) {
294 tracing::error!("failed to set peristed default, {err:?}");
297 }
298 }
299 session
300 .vars_mut()
301 .end_transaction(EndTransactionAction::Commit);
302
303 session.set_builtin_table_updates(write_notify);
311
312 let catalog = catalog.for_session(session);
313
314 let cluster_active = session.vars().cluster().to_string();
315 if session.vars().welcome_message() {
316 let cluster_info = if catalog.resolve_cluster(Some(&cluster_active)).is_err() {
317 format!("{cluster_active} (does not exist)")
318 } else {
319 cluster_active.to_string()
320 };
321
322 session.add_notice(AdapterNotice::Welcome(format!(
326 "connected to Materialize v{}
327 Org ID: {}
328 Region: {}
329 User: {}
330 Cluster: {}
331 Database: {}
332 {}
333 Session UUID: {}
334
335Issue a SQL query to get started. Need help?
336 View documentation: https://materialize.com/s/docs
337 Join our Slack community: https://materialize.com/s/chat
338 ",
339 session.vars().build_info().semver_version(),
340 self.environment_id.organization_id(),
341 self.environment_id.region(),
342 session.vars().user().name,
343 cluster_info,
344 session.vars().database(),
345 match session.vars().search_path() {
346 [schema] => format!("Schema: {}", schema),
347 schemas => format!(
348 "Search path: {}",
349 schemas.iter().map(|id| id.to_string()).join(", ")
350 ),
351 },
352 session.uuid(),
353 )));
354 }
355
356 if session.vars().current_object_missing_warnings() {
357 if catalog.active_database().is_none() {
358 let db = session.vars().database().into();
359 session.add_notice(AdapterNotice::UnknownSessionDatabase(db));
360 }
361 }
362
363 let cluster_var = session
366 .vars()
367 .inspect(CLUSTER.name())
368 .expect("cluster should exist");
369 if session.vars().current_object_missing_warnings()
370 && catalog.resolve_cluster(Some(&cluster_active)).is_err()
371 {
372 let cluster_notice = 'notice: {
373 if cluster_var.inspect_session_value().is_some() {
374 break 'notice Some(AdapterNotice::DefaultClusterDoesNotExist {
375 name: cluster_active,
376 kind: "session",
377 suggested_action: "Pick an extant cluster with SET CLUSTER = name. Run SHOW CLUSTERS to see available clusters.".into(),
378 });
379 }
380
381 let role_default = catalog.get_role(catalog.active_role_id());
382 let role_cluster = match role_default.vars().get(CLUSTER.name()) {
383 Some(OwnedVarInput::Flat(name)) => Some(name),
384 None => None,
385 Some(v @ OwnedVarInput::SqlSet(_)) => {
387 tracing::warn!(?v, "SqlSet found for cluster Role Default");
388 break 'notice None;
389 }
390 };
391
392 let alter_role = "with `ALTER ROLE <role> SET cluster TO <cluster>;`";
393 match role_cluster {
394 None => Some(AdapterNotice::DefaultClusterDoesNotExist {
396 name: cluster_active,
397 kind: "system",
398 suggested_action: format!(
399 "Set a default cluster for the current role {alter_role}."
400 ),
401 }),
402 Some(_) => Some(AdapterNotice::DefaultClusterDoesNotExist {
404 name: cluster_active,
405 kind: "role",
406 suggested_action: format!(
407 "Change the default cluster for the current role {alter_role}."
408 ),
409 }),
410 }
411 };
412
413 if let Some(notice) = cluster_notice {
414 session.add_notice(notice);
415 }
416 }
417
418 client.enable_frontend_peek_sequencing = ENABLE_FRONTEND_PEEK_SEQUENCING
419 .require(catalog.system_vars())
420 .is_ok();
421
422 Ok(client)
423 }
424
425 pub fn cancel_request(&self, conn_id: ConnectionIdType, secret_key: u32) {
427 self.send(Command::CancelRequest {
428 conn_id,
429 secret_key,
430 });
431 }
432
433 pub async fn support_execute_one(
436 &self,
437 sql: &str,
438 ) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
439 let conn_id = self.new_conn_id()?;
441 let session = self.new_session(SessionConfig {
442 conn_id,
443 uuid: Uuid::new_v4(),
444 user: SUPPORT_USER.name.clone(),
445 client_ip: None,
446 external_metadata_rx: None,
447 internal_user_metadata: None,
448 helm_chart_version: None,
449 });
450 let mut session_client = self.startup(session).await?;
451
452 let stmts = mz_sql::parse::parse(sql)?;
454 if stmts.len() != 1 {
455 bail!("must supply exactly one query");
456 }
457 let StatementParseResult { ast: stmt, sql } = stmts.into_element();
458
459 const EMPTY_PORTAL: &str = "";
460 session_client.start_transaction(Some(1))?;
461 session_client
462 .declare(EMPTY_PORTAL.into(), stmt, sql.to_string())
463 .await?;
464
465 match session_client
466 .execute(EMPTY_PORTAL.into(), futures::future::pending(), None)
467 .await?
468 {
469 (ExecuteResponse::SendingRowsStreaming { mut rows, .. }, _) => {
470 let owning_response_stream = async_stream::stream! {
475 while let Some(rows) = rows.next().await {
476 yield rows;
477 }
478 drop(session_client);
479 };
480 Ok(Box::pin(owning_response_stream))
481 }
482 r => bail!("unsupported response type: {r:?}"),
483 }
484 }
485
486 pub fn metrics(&self) -> &Metrics {
488 &self.metrics
489 }
490
491 pub fn now(&self) -> DateTime<Utc> {
493 to_datetime((self.now)())
494 }
495
496 pub async fn get_webhook_appender(
498 &self,
499 database: String,
500 schema: String,
501 name: String,
502 ) -> Result<AppendWebhookResponse, AppendWebhookError> {
503 let (tx, rx) = oneshot::channel();
504
505 self.send(Command::GetWebhook {
507 database,
508 schema,
509 name,
510 tx,
511 });
512
513 let response = rx
515 .await
516 .map_err(|_| anyhow::anyhow!("failed to receive webhook response"))?;
517
518 response
519 }
520
521 pub async fn get_system_vars(&self) -> SystemVars {
523 let (tx, rx) = oneshot::channel();
524 self.send(Command::GetSystemVars { tx });
525 rx.await.expect("coordinator unexpectedly gone")
526 }
527
528 #[instrument(level = "debug")]
529 pub(crate) fn send(&self, cmd: Command) {
530 self.inner_cmd_tx
531 .send((OpenTelemetryContext::obtain(), cmd))
532 .expect("coordinator unexpectedly gone");
533 }
534}
535
536pub struct SessionClient {
540 inner: Option<Client>,
544 session: Option<Session>,
547 timeouts: Timeout,
548 segment_client: Option<mz_segment::Client>,
549 environment_id: EnvironmentId,
550 peek_client: PeekClient,
552 pub enable_frontend_peek_sequencing: bool,
557}
558
559impl SessionClient {
560 pub fn parse<'a>(
563 &self,
564 sql: &'a str,
565 ) -> Result<Result<Vec<StatementParseResult<'a>>, ParserStatementError>, String> {
566 match mz_sql::parse::parse_with_limit(sql) {
567 Ok(Err(e)) => {
568 self.track_statement_parse_failure(&e);
569 Ok(Err(e))
570 }
571 r => r,
572 }
573 }
574
575 fn track_statement_parse_failure(&self, parse_error: &ParserStatementError) {
576 let session = self.session.as_ref().expect("session invariant violated");
577 let Some(user_id) = session.user().external_metadata.as_ref().map(|m| m.user_id) else {
578 return;
579 };
580 let Some(segment_client) = &self.segment_client else {
581 return;
582 };
583 let Some(statement_kind) = parse_error.statement else {
584 return;
585 };
586 let Some((action, object_type)) = telemetry::analyze_audited_statement(statement_kind)
587 else {
588 return;
589 };
590 let event_type = StatementFailureType::ParseFailure;
591 let event_name = format!(
592 "{} {} {}",
593 object_type.as_title_case(),
594 action.as_title_case(),
595 event_type.as_title_case(),
596 );
597 segment_client.environment_track(
598 &self.environment_id,
599 event_name,
600 json!({
601 "statement_kind": statement_kind,
602 "error": &parse_error.error,
603 }),
604 EventDetails {
605 user_id: Some(user_id),
606 application_name: Some(session.application_name()),
607 ..Default::default()
608 },
609 );
610 }
611
612 pub async fn get_prepared_statement(
615 &mut self,
616 name: &str,
617 ) -> Result<&PreparedStatement, AdapterError> {
618 let catalog = self.catalog_snapshot("get_prepared_statement").await;
619 Coordinator::verify_prepared_statement(&catalog, self.session(), name)?;
620 Ok(self
621 .session()
622 .get_prepared_statement_unverified(name)
623 .expect("must exist"))
624 }
625
626 pub async fn prepare(
631 &mut self,
632 name: String,
633 stmt: Option<Statement<Raw>>,
634 sql: String,
635 param_types: Vec<Option<SqlScalarType>>,
636 ) -> Result<(), AdapterError> {
637 let catalog = self.catalog_snapshot("prepare").await;
638
639 let mut async_pause = false;
642 (|| {
643 fail::fail_point!("async_prepare", |val| {
644 async_pause = val.map_or(false, |val| val.parse().unwrap_or(false))
645 });
646 })();
647 if async_pause {
648 tokio::time::sleep(Duration::from_secs(1)).await;
649 };
650
651 let desc = Coordinator::describe(&catalog, self.session(), stmt.clone(), param_types)?;
652 let now = self.now();
653 let state_revision = StateRevision {
654 catalog_revision: catalog.transient_revision(),
655 session_state_revision: self.session().state_revision(),
656 };
657 self.session()
658 .set_prepared_statement(name, stmt, sql, desc, state_revision, now);
659 Ok(())
660 }
661
662 #[mz_ore::instrument(level = "debug")]
664 pub async fn declare(
665 &mut self,
666 name: String,
667 stmt: Statement<Raw>,
668 sql: String,
669 ) -> Result<(), AdapterError> {
670 let catalog = self.catalog_snapshot("declare").await;
671 let param_types = vec![];
672 let desc =
673 Coordinator::describe(&catalog, self.session(), Some(stmt.clone()), param_types)?;
674 let params = vec![];
675 let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
676 let now = self.now();
677 let logging = self.session().mint_logging(sql, Some(&stmt), now);
678 let state_revision = StateRevision {
679 catalog_revision: catalog.transient_revision(),
680 session_state_revision: self.session().state_revision(),
681 };
682 self.session().set_portal(
683 name,
684 desc,
685 Some(stmt),
686 logging,
687 params,
688 result_formats,
689 state_revision,
690 )?;
691 Ok(())
692 }
693
694 #[mz_ore::instrument(level = "debug")]
701 pub async fn execute(
702 &mut self,
703 portal_name: String,
704 cancel_future: impl Future<Output = std::io::Error> + Send,
705 outer_ctx_extra: Option<ExecuteContextGuard>,
706 ) -> Result<(ExecuteResponse, Instant), AdapterError> {
707 let execute_started = Instant::now();
708
709 let mut outer_ctx_extra = outer_ctx_extra;
713 if let Some(resp) = self
714 .try_frontend_peek(&portal_name, &mut outer_ctx_extra)
715 .await?
716 {
717 debug!("frontend peek succeeded");
718 return Ok((resp, execute_started));
721 } else {
722 debug!("frontend peek did not happen, falling back to `Command::Execute`");
723 }
728
729 let response = self
730 .send_with_cancel(
731 |tx, session| Command::Execute {
732 portal_name,
733 session,
734 tx,
735 outer_ctx_extra,
736 },
737 cancel_future,
738 )
739 .await?;
740 Ok((response, execute_started))
741 }
742
743 fn now(&self) -> EpochMillis {
744 (self.inner().now)()
745 }
746
747 fn now_datetime(&self) -> DateTime<Utc> {
748 to_datetime(self.now())
749 }
750
751 pub fn start_transaction(&mut self, implicit: Option<usize>) -> Result<(), AdapterError> {
757 let now = self.now_datetime();
758 let session = self.session.as_mut().expect("session invariant violated");
759 let result = match implicit {
760 None => session.start_transaction(now, None, None),
761 Some(stmts) => {
762 session.start_transaction_implicit(now, stmts);
763 Ok(())
764 }
765 };
766 result
767 }
768
769 #[instrument(level = "debug")]
772 pub async fn end_transaction(
773 &mut self,
774 action: EndTransactionAction,
775 ) -> Result<ExecuteResponse, AdapterError> {
776 let res = self
777 .send(|tx, session| Command::Commit {
778 action,
779 session,
780 tx,
781 })
782 .await;
783 let _ = self.session().clear_transaction();
787 res
788 }
789
790 pub fn fail_transaction(&mut self) {
792 let session = self.session.take().expect("session invariant violated");
793 let session = session.fail_transaction();
794 self.session = Some(session);
795 }
796
797 #[instrument(level = "debug")]
799 pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
800 let start = std::time::Instant::now();
801 let CatalogSnapshot { catalog } = self
802 .send_without_session(|tx| Command::CatalogSnapshot { tx })
803 .await;
804 self.inner()
805 .metrics()
806 .catalog_snapshot_seconds
807 .with_label_values(&[context])
808 .observe(start.elapsed().as_secs_f64());
809 catalog
810 }
811
812 pub async fn dump_catalog(&self) -> Result<CatalogDump, AdapterError> {
817 let catalog = self.catalog_snapshot("dump_catalog").await;
818 catalog.dump().map_err(AdapterError::from)
819 }
820
821 pub async fn check_catalog(&self) -> Result<(), serde_json::Value> {
827 let catalog = self.catalog_snapshot("check_catalog").await;
828 catalog.check_consistency()
829 }
830
831 pub async fn check_coordinator(&self) -> Result<(), serde_json::Value> {
837 self.send_without_session(|tx| Command::CheckConsistency { tx })
838 .await
839 .map_err(|inconsistencies| {
840 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
841 serde_json::Value::String("failed to serialize inconsistencies".to_string())
842 })
843 })
844 }
845
846 pub async fn dump_coordinator_state(&self) -> Result<serde_json::Value, anyhow::Error> {
847 self.send_without_session(|tx| Command::Dump { tx }).await
848 }
849
850 pub fn retire_execute(
853 &self,
854 guard: ExecuteContextGuard,
855 reason: StatementEndedExecutionReason,
856 ) {
857 if !guard.is_trivial() {
858 let data = guard.defuse();
859 let cmd = Command::RetireExecute { data, reason };
860 self.inner().send(cmd);
861 }
862 }
863
864 pub async fn insert_rows(
870 &mut self,
871 target_id: CatalogItemId,
872 target_name: String,
873 columns: Vec<ColumnIndex>,
874 rows: Vec<Row>,
875 ctx_extra: ExecuteContextGuard,
876 ) -> Result<ExecuteResponse, AdapterError> {
877 let pcx = self.session().pcx().clone();
880
881 let session_meta = self.session().meta();
882
883 let catalog = self.catalog_snapshot("insert_rows").await;
884 let conn_catalog = catalog.for_session(self.session());
885 let catalog_state = conn_catalog.state();
886
887 let optimizer_config = optimize::OptimizerConfig::from(conn_catalog.system_vars());
889 let prep = ExprPrepStyle::OneShot {
890 logical_time: EvalTime::NotAvailable,
891 session: &session_meta,
892 catalog_state,
893 };
894 let mut optimizer =
895 optimize::view::Optimizer::new_with_prep_no_limit(optimizer_config.clone(), None, prep);
896
897 let result: Result<_, AdapterError> = mz_sql::plan::plan_copy_from(
898 &pcx,
899 &conn_catalog,
900 target_id,
901 target_name,
902 columns,
903 rows,
904 )
905 .err_into()
906 .and_then(|values| optimizer.optimize(values).err_into())
907 .and_then(|values| {
908 Coordinator::insert_constant(&catalog, self.session(), target_id, values.into_inner())
910 });
911 self.retire_execute(ctx_extra, (&result).into());
912 result
913 }
914
915 pub async fn get_system_vars(&self) -> SystemVars {
917 self.inner().get_system_vars().await
918 }
919
920 pub async fn set_system_vars(
922 &mut self,
923 vars: BTreeMap<String, String>,
924 ) -> Result<(), AdapterError> {
925 let conn_id = self.session().conn_id().clone();
926 self.send_without_session(|tx| Command::SetSystemVars { vars, conn_id, tx })
927 .await
928 }
929
930 pub async fn terminate(&mut self) {
932 let conn_id = self.session().conn_id().clone();
933 let res = self
934 .send_without_session(|tx| Command::Terminate {
935 conn_id,
936 tx: Some(tx),
937 })
938 .await;
939 if let Err(e) = res {
940 error!("Unable to terminate session: {e:?}");
942 }
943 self.inner = None;
945 }
946
947 pub fn session(&mut self) -> &mut Session {
949 self.session.as_mut().expect("session invariant violated")
950 }
951
952 pub fn inner(&self) -> &Client {
954 self.inner.as_ref().expect("inner invariant violated")
955 }
956
957 async fn send_without_session<T, F>(&self, f: F) -> T
958 where
959 F: FnOnce(oneshot::Sender<T>) -> Command,
960 {
961 let (tx, rx) = oneshot::channel();
962 self.inner().send(f(tx));
963 rx.await.expect("sender dropped")
964 }
965
966 #[instrument(level = "debug")]
967 async fn send<T, F>(&mut self, f: F) -> Result<T, AdapterError>
968 where
969 F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
970 {
971 self.send_with_cancel(f, futures::future::pending()).await
972 }
973
974 #[instrument(level = "debug")]
978 async fn send_with_cancel<T, F>(
979 &mut self,
980 f: F,
981 cancel_future: impl Future<Output = std::io::Error> + Send,
982 ) -> Result<T, AdapterError>
983 where
984 F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
985 {
986 let session = self.session.take().expect("session invariant violated");
987 let mut typ = None;
988 let application_name = session.application_name();
989 let name_hint = ApplicationNameHint::from_str(application_name);
990 let conn_id = session.conn_id().clone();
991 let (tx, rx) = oneshot::channel();
992
993 let Self {
996 inner: inner_client,
997 session: client_session,
998 ..
999 } = self;
1000
1001 let inner_client = inner_client.as_ref().expect("inner invariant violated");
1004
1005 let mut guarded_rx = rx.with_guard(|response: Response<_>| {
1011 *client_session = Some(response.session);
1012 });
1013
1014 inner_client.send({
1015 let cmd = f(tx, session);
1016 match cmd {
1020 Command::Execute { .. } => typ = Some("execute"),
1021 Command::GetWebhook { .. } => typ = Some("webhook"),
1022 Command::Startup { .. }
1023 | Command::AuthenticatePassword { .. }
1024 | Command::AuthenticateGetSASLChallenge { .. }
1025 | Command::AuthenticateVerifySASLProof { .. }
1026 | Command::CatalogSnapshot { .. }
1027 | Command::Commit { .. }
1028 | Command::CancelRequest { .. }
1029 | Command::PrivilegedCancelRequest { .. }
1030 | Command::GetSystemVars { .. }
1031 | Command::SetSystemVars { .. }
1032 | Command::Terminate { .. }
1033 | Command::RetireExecute { .. }
1034 | Command::CheckConsistency { .. }
1035 | Command::Dump { .. }
1036 | Command::GetComputeInstanceClient { .. }
1037 | Command::GetOracle { .. }
1038 | Command::DetermineRealTimeRecentTimestamp { .. }
1039 | Command::GetTransactionReadHoldsBundle { .. }
1040 | Command::StoreTransactionReadHolds { .. }
1041 | Command::ExecuteSlowPathPeek { .. }
1042 | Command::CopyToPreflight { .. }
1043 | Command::ExecuteCopyTo { .. }
1044 | Command::ExecuteSideEffectingFunc { .. }
1045 | Command::RegisterFrontendPeek { .. }
1046 | Command::UnregisterFrontendPeek { .. }
1047 | Command::ExplainTimestamp { .. }
1048 | Command::FrontendStatementLogging(..) => {}
1049 };
1050 cmd
1051 });
1052
1053 let mut cancel_future = pin::pin!(cancel_future);
1054 let mut cancelled = false;
1055 loop {
1056 tokio::select! {
1057 res = &mut guarded_rx => {
1058 drop(guarded_rx);
1060
1061 let res = res.expect("sender dropped");
1062 let status = res.result.is_ok().then_some("success").unwrap_or("error");
1063 if let Err(err) = res.result.as_ref() {
1064 if name_hint.should_trace_errors() {
1065 tracing::warn!(?err, ?name_hint, "adapter response error");
1066 }
1067 }
1068
1069 if let Some(typ) = typ {
1070 inner_client
1071 .metrics
1072 .commands
1073 .with_label_values(&[typ, status, name_hint.as_str()])
1074 .inc();
1075 }
1076 *client_session = Some(res.session);
1077 return res.result;
1078 },
1079 _err = &mut cancel_future, if !cancelled => {
1080 cancelled = true;
1081 inner_client.send(Command::PrivilegedCancelRequest {
1082 conn_id: conn_id.clone(),
1083 });
1084 }
1085 };
1086 }
1087 }
1088
1089 pub fn add_idle_in_transaction_session_timeout(&mut self) {
1090 let session = self.session();
1091 let timeout_dur = session.vars().idle_in_transaction_session_timeout();
1092 if !timeout_dur.is_zero() {
1093 let timeout_dur = timeout_dur.clone();
1094 if let Some(txn) = session.transaction().inner() {
1095 let txn_id = txn.id.clone();
1096 let timeout = TimeoutType::IdleInTransactionSession(txn_id);
1097 self.timeouts.add_timeout(timeout, timeout_dur);
1098 }
1099 }
1100 }
1101
1102 pub fn remove_idle_in_transaction_session_timeout(&mut self) {
1103 let session = self.session();
1104 if let Some(txn) = session.transaction().inner() {
1105 let txn_id = txn.id.clone();
1106 self.timeouts
1107 .remove_timeout(&TimeoutType::IdleInTransactionSession(txn_id));
1108 }
1109 }
1110
1111 pub async fn recv_timeout(&mut self) -> Option<TimeoutType> {
1118 self.timeouts.recv().await
1119 }
1120
1121 pub fn peek_client(&self) -> &PeekClient {
1123 &self.peek_client
1124 }
1125
1126 pub fn peek_client_mut(&mut self) -> &mut PeekClient {
1128 &mut self.peek_client
1129 }
1130
1131 pub(crate) async fn try_frontend_peek(
1139 &mut self,
1140 portal_name: &str,
1141 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
1142 ) -> Result<Option<ExecuteResponse>, AdapterError> {
1143 if self.enable_frontend_peek_sequencing {
1144 let session = self.session.as_mut().expect("SessionClient invariant");
1145 self.peek_client
1146 .try_frontend_peek(portal_name, session, outer_ctx_extra)
1147 .await
1148 } else {
1149 Ok(None)
1150 }
1151 }
1152}
1153
1154impl Drop for SessionClient {
1155 fn drop(&mut self) {
1156 if let Some(session) = self.session.take() {
1160 if let Some(inner) = &self.inner {
1163 inner.send(Command::Terminate {
1164 conn_id: session.conn_id().clone(),
1165 tx: None,
1166 })
1167 }
1168 }
1169 }
1170}
1171
1172#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
1173pub enum TimeoutType {
1174 IdleInTransactionSession(TransactionId),
1175}
1176
1177impl Display for TimeoutType {
1178 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1179 match self {
1180 TimeoutType::IdleInTransactionSession(txn_id) => {
1181 writeln!(f, "Idle in transaction session for transaction '{txn_id}'")
1182 }
1183 }
1184 }
1185}
1186
1187impl From<TimeoutType> for AdapterError {
1188 fn from(timeout: TimeoutType) -> Self {
1189 match timeout {
1190 TimeoutType::IdleInTransactionSession(_) => {
1191 AdapterError::IdleInTransactionSessionTimeout
1192 }
1193 }
1194 }
1195}
1196
1197struct Timeout {
1198 tx: mpsc::UnboundedSender<TimeoutType>,
1199 rx: mpsc::UnboundedReceiver<TimeoutType>,
1200 active_timeouts: BTreeMap<TimeoutType, AbortOnDropHandle<()>>,
1201}
1202
1203impl Timeout {
1204 fn new() -> Self {
1205 let (tx, rx) = mpsc::unbounded_channel();
1206 Timeout {
1207 tx,
1208 rx,
1209 active_timeouts: BTreeMap::new(),
1210 }
1211 }
1212
1213 async fn recv(&mut self) -> Option<TimeoutType> {
1222 self.rx.recv().await
1223 }
1224
1225 fn add_timeout(&mut self, timeout: TimeoutType, duration: Duration) {
1226 let tx = self.tx.clone();
1227 let timeout_key = timeout.clone();
1228 let handle = mz_ore::task::spawn(|| format!("{timeout_key}"), async move {
1229 tokio::time::sleep(duration).await;
1230 let _ = tx.send(timeout);
1231 })
1232 .abort_on_drop();
1233 self.active_timeouts.insert(timeout_key, handle);
1234 }
1235
1236 fn remove_timeout(&mut self, timeout: &TimeoutType) {
1237 self.active_timeouts.remove(timeout);
1238
1239 let mut timeouts = Vec::new();
1241 while let Ok(pending_timeout) = self.rx.try_recv() {
1242 if timeout != &pending_timeout {
1243 timeouts.push(pending_timeout);
1244 }
1245 }
1246 for pending_timeout in timeouts {
1247 self.tx.send(pending_timeout).expect("rx is in this struct");
1248 }
1249 }
1250}
1251
1252#[derive(Derivative)]
1256#[derivative(Debug)]
1257pub struct RecordFirstRowStream {
1258 #[derivative(Debug = "ignore")]
1260 pub rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1261 pub execute_started: Instant,
1263 pub time_to_first_row_seconds: Histogram,
1266 pub saw_rows: bool,
1268 pub recorded_first_row_instant: Option<Instant>,
1270 pub no_more_rows: bool,
1272}
1273
1274impl RecordFirstRowStream {
1275 pub fn new(
1277 rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1278 execute_started: Instant,
1279 client: &SessionClient,
1280 instance_id: Option<ComputeInstanceId>,
1281 strategy: Option<StatementExecutionStrategy>,
1282 ) -> Self {
1283 let histogram = Self::histogram(client, instance_id, strategy);
1284 Self {
1285 rows,
1286 execute_started,
1287 time_to_first_row_seconds: histogram,
1288 saw_rows: false,
1289 recorded_first_row_instant: None,
1290 no_more_rows: false,
1291 }
1292 }
1293
1294 fn histogram(
1295 client: &SessionClient,
1296 instance_id: Option<ComputeInstanceId>,
1297 strategy: Option<StatementExecutionStrategy>,
1298 ) -> Histogram {
1299 let isolation_level = *client
1300 .session
1301 .as_ref()
1302 .expect("session invariant")
1303 .vars()
1304 .transaction_isolation();
1305 let instance = match instance_id {
1306 Some(i) => Cow::Owned(i.to_string()),
1307 None => Cow::Borrowed("none"),
1308 };
1309 let strategy = match strategy {
1310 Some(s) => s.name(),
1311 None => "none",
1312 };
1313
1314 client
1315 .inner()
1316 .metrics()
1317 .time_to_first_row_seconds
1318 .with_label_values(&[instance.as_ref(), isolation_level.as_str(), strategy])
1319 }
1320
1321 pub fn record(
1324 execute_started: Instant,
1325 client: &SessionClient,
1326 instance_id: Option<ComputeInstanceId>,
1327 strategy: Option<StatementExecutionStrategy>,
1328 ) {
1329 Self::histogram(client, instance_id, strategy)
1330 .observe(execute_started.elapsed().as_secs_f64());
1331 }
1332
1333 pub async fn recv(&mut self) -> Option<PeekResponseUnary> {
1334 let msg = self.rows.next().await;
1335 if !self.saw_rows && matches!(msg, Some(PeekResponseUnary::Rows(_))) {
1336 self.saw_rows = true;
1337 self.time_to_first_row_seconds
1338 .observe(self.execute_started.elapsed().as_secs_f64());
1339 self.recorded_first_row_instant = Some(Instant::now());
1340 }
1341 if msg.is_none() {
1342 self.no_more_rows = true;
1343 }
1344 msg
1345 }
1346}