1use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Display, Formatter};
13use std::future::Future;
14use std::pin::{self, Pin};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use chrono::{DateTime, Utc};
20use derivative::Derivative;
21use futures::{Stream, StreamExt};
22use itertools::Itertools;
23use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
24use mz_auth::password::Password;
25use mz_auth::{Authenticated, AuthenticatorKind};
26use mz_build_info::BuildInfo;
27use mz_compute_types::ComputeInstanceId;
28use mz_ore::channel::OneshotReceiverExt;
29use mz_ore::collections::CollectionExt;
30use mz_ore::id_gen::{IdAllocator, IdAllocatorInnerBitSet, MAX_ORG_ID, org_id_conn_bits};
31use mz_ore::instrument;
32use mz_ore::now::{EpochMillis, NowFn, to_datetime};
33use mz_ore::str::StrExt;
34use mz_ore::task::AbortOnDropHandle;
35use mz_ore::thread::JoinOnDropHandle;
36use mz_ore::tracing::OpenTelemetryContext;
37use mz_repr::user::InternalUserMetadata;
38use mz_repr::{CatalogItemId, ColumnIndex, SqlScalarType};
39use mz_sql::ast::{Raw, Statement};
40use mz_sql::catalog::{EnvironmentId, SessionCatalog};
41use mz_sql::session::hint::ApplicationNameHint;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::user::SUPPORT_USER;
44use mz_sql::session::vars::{
45 CLUSTER, ENABLE_FRONTEND_PEEK_SEQUENCING, OwnedVarInput, SystemVars, Var,
46};
47use mz_sql_parser::parser::{ParserStatementError, StatementParseResult};
48use prometheus::Histogram;
49use serde_json::json;
50use tokio::sync::{mpsc, oneshot};
51use tracing::{debug, error};
52use uuid::Uuid;
53
54use crate::catalog::Catalog;
55use crate::command::{
56 CatalogDump, CatalogSnapshot, Command, CopyFromStdinWriter, ExecuteResponse, Response,
57 SASLChallengeResponse, SASLVerifyProofResponse, SuperuserAttribute,
58};
59use crate::coord::{Coordinator, ExecuteContextGuard};
60use crate::error::AdapterError;
61use crate::metrics::{self, Metrics};
62use crate::session::{
63 EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId,
64};
65use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
66use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType};
67use crate::webhook::AppendWebhookResponse;
68use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse};
69
70pub struct Handle {
76 pub(crate) session_id: Uuid,
77 pub(crate) start_instant: Instant,
78 pub(crate) _thread: JoinOnDropHandle<()>,
79}
80
81impl Handle {
82 pub fn session_id(&self) -> Uuid {
88 self.session_id
89 }
90
91 pub fn start_instant(&self) -> Instant {
93 self.start_instant
94 }
95}
96
97#[derive(Debug, Clone)]
105pub struct Client {
106 build_info: &'static BuildInfo,
107 inner_cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
108 id_alloc: IdAllocator<IdAllocatorInnerBitSet>,
109 now: NowFn,
110 metrics: Metrics,
111 environment_id: EnvironmentId,
112 segment_client: Option<mz_segment::Client>,
113}
114
115impl Client {
116 pub(crate) fn new(
117 build_info: &'static BuildInfo,
118 cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
119 metrics: Metrics,
120 now: NowFn,
121 environment_id: EnvironmentId,
122 segment_client: Option<mz_segment::Client>,
123 ) -> Client {
124 let env_lower = org_id_conn_bits(&environment_id.organization_id());
132 Client {
133 build_info,
134 inner_cmd_tx: cmd_tx,
135 id_alloc: IdAllocator::new(1, MAX_ORG_ID, env_lower),
136 now,
137 metrics,
138 environment_id,
139 segment_client,
140 }
141 }
142
143 pub fn new_conn_id(&self) -> Result<ConnectionId, AdapterError> {
145 self.id_alloc.alloc().ok_or(AdapterError::IdExhaustionError)
146 }
147
148 pub fn new_session(&self, config: SessionConfig, _authenticated: Authenticated) -> Session {
154 Session::new(self.build_info, config, self.metrics().session_metrics())
158 }
159
160 pub async fn authenticate(
164 &self,
165 user: &String,
166 password: &Password,
167 ) -> Result<Authenticated, AdapterError> {
168 let (tx, rx) = oneshot::channel();
169 self.send(Command::AuthenticatePassword {
170 role_name: user.to_string(),
171 password: Some(password.clone()),
172 tx,
173 });
174 rx.await.expect("sender dropped")?;
175 Ok(Authenticated)
176 }
177
178 pub async fn generate_sasl_challenge(
181 &self,
182 user: &String,
183 client_nonce: &String,
184 ) -> Result<SASLChallengeResponse, AdapterError> {
185 let (tx, rx) = oneshot::channel();
186 self.send(Command::AuthenticateGetSASLChallenge {
187 role_name: user.to_string(),
188 nonce: client_nonce.to_string(),
189 tx,
190 });
191 let response = rx.await.expect("sender dropped")?;
192 Ok(response)
193 }
194
195 pub async fn verify_sasl_proof(
198 &self,
199 user: &String,
200 proof: &String,
201 nonce: &String,
202 mock_hash: &String,
203 ) -> Result<(SASLVerifyProofResponse, Authenticated), AdapterError> {
204 let (tx, rx) = oneshot::channel();
205 self.send(Command::AuthenticateVerifySASLProof {
206 role_name: user.to_string(),
207 proof: proof.to_string(),
208 auth_message: nonce.to_string(),
209 mock_hash: mock_hash.to_string(),
210 tx,
211 });
212 let response = rx.await.expect("sender dropped")?;
213 Ok((response, Authenticated))
214 }
215
216 pub async fn role_can_login(&self, role_name: &str) -> Result<(), AdapterError> {
218 let (tx, rx) = oneshot::channel();
219 self.send(Command::CheckRoleCanLogin {
220 role_name: role_name.to_string(),
221 tx,
222 });
223 rx.await.expect("sender dropped")
224 }
225
226 #[mz_ore::instrument(level = "debug")]
235 pub async fn startup(&self, session: Session) -> Result<SessionClient, AdapterError> {
236 let user = session.user().clone();
237 let conn_id = session.conn_id().clone();
238 let secret_key = session.secret_key();
239 let uuid = session.uuid();
240 let client_ip = session.client_ip();
241 let application_name = session.application_name().into();
242 let notice_tx = session.retain_notice_transmitter();
243
244 let (tx, rx) = oneshot::channel();
245
246 let rx = rx.with_guard(|_| {
252 self.send(Command::Terminate {
253 conn_id: conn_id.clone(),
254 tx: None,
255 });
256 });
257
258 self.send(Command::Startup {
259 tx,
260 user,
261 conn_id: conn_id.clone(),
262 secret_key,
263 uuid,
264 client_ip: client_ip.copied(),
265 application_name,
266 notice_tx,
267 });
268
269 let response = rx.await.expect("sender dropped")?;
272
273 let StartupResponse {
277 role_id,
278 write_notify,
279 session_defaults,
280 catalog,
281 storage_collections,
282 transient_id_gen,
283 optimizer_metrics,
284 persist_client,
285 statement_logging_frontend,
286 superuser_attribute,
287 } = response;
288
289 let peek_client = PeekClient::new(
290 self.clone(),
291 storage_collections,
292 transient_id_gen,
293 optimizer_metrics,
294 persist_client,
295 statement_logging_frontend,
296 );
297
298 let mut client = SessionClient {
299 inner: Some(self.clone()),
300 session: Some(session),
301 timeouts: Timeout::new(),
302 environment_id: self.environment_id.clone(),
303 segment_client: self.segment_client.clone(),
304 peek_client,
305 enable_frontend_peek_sequencing: false, };
307
308 let session = client.session();
309
310 if let SuperuserAttribute(Some(superuser)) = superuser_attribute {
313 session.apply_internal_user_metadata(InternalUserMetadata { superuser });
314 }
315
316 session.initialize_role_metadata(role_id);
317 let vars_mut = session.vars_mut();
318 for (name, val) in session_defaults {
319 if let Err(err) = vars_mut.set_default(&name, val.borrow()) {
320 tracing::error!("failed to set peristed default, {err:?}");
323 }
324 }
325 session
326 .vars_mut()
327 .end_transaction(EndTransactionAction::Commit);
328
329 session.set_builtin_table_updates(write_notify);
337
338 let catalog = catalog.for_session(session);
339
340 let cluster_active = session.vars().cluster().to_string();
341 if session.vars().welcome_message() {
342 let cluster_info = if catalog.resolve_cluster(Some(&cluster_active)).is_err() {
343 format!("{cluster_active} (does not exist)")
344 } else {
345 cluster_active.to_string()
346 };
347
348 session.add_notice(AdapterNotice::Welcome(format!(
352 "connected to Materialize v{}
353 Environment ID: {}
354 Region: {}
355 User: {}
356 Cluster: {}
357 Database: {}
358 {}
359 Session UUID: {}
360
361Issue a SQL query to get started. Need help?
362 View documentation: https://materialize.com/s/docs
363 Join our Slack community: https://materialize.com/s/chat
364 ",
365 session.vars().build_info().semver_version(),
366 self.environment_id,
367 self.environment_id.region(),
368 session.vars().user().name,
369 cluster_info,
370 session.vars().database(),
371 match session.vars().search_path() {
372 [schema] => format!("Schema: {}", schema),
373 schemas => format!(
374 "Search path: {}",
375 schemas.iter().map(|id| id.to_string()).join(", ")
376 ),
377 },
378 session.uuid(),
379 )));
380 }
381
382 if session.vars().current_object_missing_warnings() {
383 if catalog.active_database().is_none() {
384 let db = session.vars().database().into();
385 session.add_notice(AdapterNotice::UnknownSessionDatabase(db));
386 }
387 }
388
389 let cluster_var = session
392 .vars()
393 .inspect(CLUSTER.name())
394 .expect("cluster should exist");
395 if session.vars().current_object_missing_warnings()
396 && catalog.resolve_cluster(Some(&cluster_active)).is_err()
397 {
398 let cluster_notice = 'notice: {
399 if cluster_var.inspect_session_value().is_some() {
400 break 'notice Some(AdapterNotice::DefaultClusterDoesNotExist {
401 name: cluster_active,
402 kind: "session",
403 suggested_action: "Pick an extant cluster with SET CLUSTER = name. Run SHOW CLUSTERS to see available clusters.".into(),
404 });
405 }
406
407 let role_default = catalog.get_role(catalog.active_role_id());
408 let role_cluster = match role_default.vars().get(CLUSTER.name()) {
409 Some(OwnedVarInput::Flat(name)) => Some(name),
410 None => None,
411 Some(v @ OwnedVarInput::SqlSet(_)) => {
413 tracing::warn!(?v, "SqlSet found for cluster Role Default");
414 break 'notice None;
415 }
416 };
417
418 let alter_role = "with `ALTER ROLE <role> SET cluster TO <cluster>;`";
419 match role_cluster {
420 None => Some(AdapterNotice::DefaultClusterDoesNotExist {
422 name: cluster_active,
423 kind: "system",
424 suggested_action: format!(
425 "Set a default cluster for the current role {alter_role}."
426 ),
427 }),
428 Some(_) => Some(AdapterNotice::DefaultClusterDoesNotExist {
430 name: cluster_active,
431 kind: "role",
432 suggested_action: format!(
433 "Change the default cluster for the current role {alter_role}."
434 ),
435 }),
436 }
437 };
438
439 if let Some(notice) = cluster_notice {
440 session.add_notice(notice);
441 }
442 }
443
444 client.enable_frontend_peek_sequencing = ENABLE_FRONTEND_PEEK_SEQUENCING
445 .require(catalog.system_vars())
446 .is_ok();
447
448 Ok(client)
449 }
450
451 pub fn cancel_request(&self, conn_id: ConnectionIdType, secret_key: u32) {
453 self.send(Command::CancelRequest {
454 conn_id,
455 secret_key,
456 });
457 }
458
459 pub async fn support_execute_one(
462 &self,
463 sql: &str,
464 ) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
465 let conn_id = self.new_conn_id()?;
467 let session = self.new_session(
468 SessionConfig {
469 conn_id,
470 uuid: Uuid::new_v4(),
471 user: SUPPORT_USER.name.clone(),
472 client_ip: None,
473 external_metadata_rx: None,
474 helm_chart_version: None,
475 authenticator_kind: AuthenticatorKind::None,
476 groups: None,
477 },
478 Authenticated,
479 );
480 let mut session_client = self.startup(session).await?;
481
482 let stmts = mz_sql::parse::parse(sql)?;
484 if stmts.len() != 1 {
485 bail!("must supply exactly one query");
486 }
487 let StatementParseResult { ast: stmt, sql } = stmts.into_element();
488
489 const EMPTY_PORTAL: &str = "";
490 session_client.start_transaction(Some(1))?;
491 session_client
492 .declare(EMPTY_PORTAL.into(), stmt, sql.to_string())
493 .await?;
494
495 let execute_result = session_client
496 .execute(EMPTY_PORTAL.into(), futures::future::pending(), None)
497 .await?;
498 match execute_result {
499 (ExecuteResponse::SendingRowsStreaming { mut rows, .. }, _) => {
500 let owning_response_stream = async_stream::stream! {
505 while let Some(rows) = rows.next().await {
506 yield rows;
507 }
508 drop(session_client);
509 };
510 Ok(Box::pin(owning_response_stream))
511 }
512 r => bail!("unsupported response type: {r:?}"),
513 }
514 }
515
516 pub fn metrics(&self) -> &Metrics {
518 &self.metrics
519 }
520
521 pub fn now(&self) -> DateTime<Utc> {
523 to_datetime((self.now)())
524 }
525
526 pub async fn get_webhook_appender(
528 &self,
529 database: String,
530 schema: String,
531 name: String,
532 ) -> Result<AppendWebhookResponse, AppendWebhookError> {
533 let (tx, rx) = oneshot::channel();
534
535 self.send(Command::GetWebhook {
537 database,
538 schema,
539 name,
540 tx,
541 });
542
543 let response = rx
545 .await
546 .map_err(|_| anyhow::anyhow!("failed to receive webhook response"))?;
547
548 response
549 }
550
551 pub async fn get_system_vars(&self) -> SystemVars {
553 let (tx, rx) = oneshot::channel();
554 self.send(Command::GetSystemVars { tx });
555 rx.await.expect("coordinator unexpectedly gone")
556 }
557
558 #[instrument(level = "debug")]
559 pub(crate) fn send(&self, cmd: Command) {
560 self.inner_cmd_tx
561 .send((OpenTelemetryContext::obtain(), cmd))
562 .expect("coordinator unexpectedly gone");
563 }
564}
565
566pub struct SessionClient {
570 inner: Option<Client>,
574 session: Option<Session>,
577 timeouts: Timeout,
578 segment_client: Option<mz_segment::Client>,
579 environment_id: EnvironmentId,
580 peek_client: PeekClient,
582 pub enable_frontend_peek_sequencing: bool,
587}
588
589impl SessionClient {
590 pub fn parse<'a>(
593 &self,
594 sql: &'a str,
595 ) -> Result<Result<Vec<StatementParseResult<'a>>, ParserStatementError>, String> {
596 match mz_sql::parse::parse_with_limit(sql) {
597 Ok(Err(e)) => {
598 self.track_statement_parse_failure(&e);
599 Ok(Err(e))
600 }
601 r => r,
602 }
603 }
604
605 fn track_statement_parse_failure(&self, parse_error: &ParserStatementError) {
606 let session = self.session.as_ref().expect("session invariant violated");
607 let Some(user_id) = session.user().external_metadata.as_ref().map(|m| m.user_id) else {
608 return;
609 };
610 let Some(segment_client) = &self.segment_client else {
611 return;
612 };
613 let Some(statement_kind) = parse_error.statement else {
614 return;
615 };
616 let Some((action, object_type)) = telemetry::analyze_audited_statement(statement_kind)
617 else {
618 return;
619 };
620 let event_type = StatementFailureType::ParseFailure;
621 let event_name = format!(
622 "{} {} {}",
623 object_type.as_title_case(),
624 action.as_title_case(),
625 event_type.as_title_case(),
626 );
627 segment_client.environment_track(
628 &self.environment_id,
629 event_name,
630 json!({
631 "statement_kind": statement_kind,
632 "error": &parse_error.error,
633 }),
634 EventDetails {
635 user_id: Some(user_id),
636 application_name: Some(session.application_name()),
637 ..Default::default()
638 },
639 );
640 }
641
642 pub async fn get_prepared_statement(
645 &mut self,
646 name: &str,
647 ) -> Result<&PreparedStatement, AdapterError> {
648 let catalog = self.catalog_snapshot("get_prepared_statement").await;
649 Coordinator::verify_prepared_statement(&catalog, self.session(), name)?;
650 Ok(self
651 .session()
652 .get_prepared_statement_unverified(name)
653 .expect("must exist"))
654 }
655
656 pub async fn prepare(
661 &mut self,
662 name: String,
663 stmt: Option<Statement<Raw>>,
664 sql: String,
665 param_types: Vec<Option<SqlScalarType>>,
666 ) -> Result<(), AdapterError> {
667 let catalog = self.catalog_snapshot("prepare").await;
668
669 let mut async_pause = false;
672 (|| {
673 fail::fail_point!("async_prepare", |val| {
674 async_pause = val.map_or(false, |val| val.parse().unwrap_or(false))
675 });
676 })();
677 if async_pause {
678 tokio::time::sleep(Duration::from_secs(1)).await;
679 };
680
681 let desc = Coordinator::describe(&catalog, self.session(), stmt.clone(), param_types)?;
682 let now = self.now();
683 let state_revision = StateRevision {
684 catalog_revision: catalog.transient_revision(),
685 session_state_revision: self.session().state_revision(),
686 };
687 self.session()
688 .set_prepared_statement(name, stmt, sql, desc, state_revision, now);
689 Ok(())
690 }
691
692 #[mz_ore::instrument(level = "debug")]
694 pub async fn declare(
695 &mut self,
696 name: String,
697 stmt: Statement<Raw>,
698 sql: String,
699 ) -> Result<(), AdapterError> {
700 let catalog = self.catalog_snapshot("declare").await;
701 let param_types = vec![];
702 let desc =
703 Coordinator::describe(&catalog, self.session(), Some(stmt.clone()), param_types)?;
704 let params = vec![];
705 let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
706 let now = self.now();
707 let logging = self.session().mint_logging(sql, Some(&stmt), now);
708 let state_revision = StateRevision {
709 catalog_revision: catalog.transient_revision(),
710 session_state_revision: self.session().state_revision(),
711 };
712 self.session().set_portal(
713 name,
714 desc,
715 Some(stmt),
716 logging,
717 params,
718 result_formats,
719 state_revision,
720 )?;
721 Ok(())
722 }
723
724 #[mz_ore::instrument(level = "debug")]
731 pub async fn execute(
732 &mut self,
733 portal_name: String,
734 cancel_future: impl Future<Output = std::io::Error> + Send,
735 outer_ctx_extra: Option<ExecuteContextGuard>,
736 ) -> Result<(ExecuteResponse, Instant), AdapterError> {
737 let execute_started = Instant::now();
738
739 let mut outer_ctx_extra = outer_ctx_extra;
740
741 let (portal_name, catalog) = self
755 .unroll_sql_execute(portal_name, &mut outer_ctx_extra)
756 .await?;
757
758 let peek_result = self
762 .try_frontend_peek(&portal_name, catalog, &mut outer_ctx_extra)
763 .await?;
764 if let Some(resp) = peek_result {
765 debug!("frontend peek succeeded");
766 return Ok((resp, execute_started));
769 } else {
770 debug!("frontend peek did not happen, falling back to `Command::Execute`");
771 }
776
777 let response = self
778 .send_with_cancel(
779 |tx, session| Command::Execute {
780 portal_name,
781 session,
782 tx,
783 outer_ctx_extra,
784 },
785 cancel_future,
786 )
787 .await?;
788 Ok((response, execute_started))
789 }
790
791 async fn unroll_sql_execute(
807 &mut self,
808 portal_name: String,
809 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
810 ) -> Result<(String, Option<Arc<Catalog>>), AdapterError> {
811 let (stmt, params, outer_logging, outer_lifecycle_timestamps) = {
812 let session = self.session.as_ref().expect("SessionClient invariant");
813 let portal = match session.get_portal_unverified(&portal_name) {
814 Some(p) => p,
815 None => return Ok((portal_name, None)),
818 };
819 match &portal.stmt {
820 Some(stmt) => (
821 Arc::clone(stmt),
822 portal.parameters.clone(),
823 Arc::clone(&portal.logging),
824 portal.lifecycle_timestamps.clone(),
825 ),
826 None => return Ok((portal_name, None)),
827 }
828 };
829
830 if !matches!(&*stmt, Statement::Execute(_)) {
833 return Ok((portal_name, None));
834 }
835
836 let catalog = self.catalog_snapshot("unroll_sql_execute").await;
837
838 {
842 let session = self.session.as_mut().expect("SessionClient invariant");
843 Coordinator::verify_portal(&catalog, session, &portal_name)?;
844 }
845
846 {
850 let session = self.session.as_ref().expect("SessionClient invariant");
851 session
852 .metrics()
853 .query_total(&[
854 metrics::session_type_label_value(session.user()),
855 metrics::statement_type_label_value(&stmt),
856 ])
857 .inc();
858 }
859
860 let began_outer_logging = outer_ctx_extra.is_none();
870 let logging_id: Option<crate::statement_logging::StatementLoggingId> =
871 if began_outer_logging {
872 let session = self.session.as_mut().expect("SessionClient invariant");
873 let result = self
874 .peek_client
875 .statement_logging_frontend
876 .begin_statement_execution(
877 session,
878 ¶ms,
879 &outer_logging,
880 catalog.system_config(),
881 outer_lifecycle_timestamps,
882 );
883 if let Some((id, began_execution, mseh_update, prepared_statement)) = result {
884 self.peek_client.log_began_execution(
885 began_execution,
886 mseh_update,
887 prepared_statement,
888 );
889 Some(id)
890 } else {
891 None
892 }
893 } else {
894 None
895 };
896
897 let new_portal_name = match self.install_inner_portal_for_execute(&catalog, &stmt, ¶ms)
898 {
899 Ok(name) => name,
900 Err(err) => {
901 if let Some(id) = logging_id {
902 self.peek_client.log_ended_execution(
903 id,
904 StatementEndedExecutionReason::Errored {
905 error: err.to_string(),
906 },
907 );
908 }
909 return Err(err);
910 }
911 };
912
913 if began_outer_logging {
925 let (dummy_tx, _dummy_rx) = mpsc::unbounded_channel();
933 *outer_ctx_extra = Some(ExecuteContextGuard::new(logging_id, dummy_tx));
934 }
935
936 Ok((new_portal_name, Some(catalog)))
937 }
938
939 fn install_inner_portal_for_execute(
948 &mut self,
949 catalog: &Arc<Catalog>,
950 stmt: &Arc<Statement<Raw>>,
951 params: &mz_sql::plan::Params,
952 ) -> Result<String, AdapterError> {
953 use mz_sql::plan::Plan;
954
955 let execute_plan = {
956 let session = self.session.as_mut().expect("SessionClient invariant");
957 let conn_catalog = catalog.for_session(session);
958 let (resolved_stmt, resolved_ids) =
959 mz_sql::names::resolve(&conn_catalog, (**stmt).clone())?;
960 let pcx = session.pcx();
961 let plan = mz_sql::plan::plan(
962 Some(pcx),
963 &conn_catalog,
964 resolved_stmt,
965 params,
966 &resolved_ids,
967 )?;
968 match plan {
969 Plan::Execute(plan) => plan,
970 other => {
971 return Err(AdapterError::Internal(format!(
975 "planning Statement::Execute yielded unexpected plan: {:?}",
976 mz_sql::plan::PlanKind::from(&other),
977 )));
978 }
979 }
980 };
981
982 let session = self.session.as_mut().expect("SessionClient invariant");
988 Coordinator::verify_prepared_statement(catalog, session, &execute_plan.name)?;
989 let ps = session
990 .get_prepared_statement_unverified(&execute_plan.name)
991 .expect("verified above");
992 let inner_stmt = ps.stmt().cloned();
993 let inner_desc = ps.desc().clone();
994 let state_revision = ps.state_revision;
995 let inner_logging = Arc::clone(ps.logging());
996
997 if let Some(inner) = inner_stmt.as_ref() {
1002 if matches!(inner, Statement::Execute(_)) {
1003 return Err(AdapterError::Internal(format!(
1004 "nested EXECUTE: prepared statement {} resolves to another EXECUTE; \
1005 parser should reject `PREPARE ... AS EXECUTE ...`",
1006 execute_plan.name.quoted(),
1007 )));
1008 }
1009 }
1010
1011 session.create_new_portal(
1012 inner_stmt,
1013 inner_logging,
1014 inner_desc,
1015 execute_plan.params,
1016 Vec::new(),
1017 state_revision,
1018 )
1019 }
1020
1021 fn now(&self) -> EpochMillis {
1022 (self.inner().now)()
1023 }
1024
1025 fn now_datetime(&self) -> DateTime<Utc> {
1026 to_datetime(self.now())
1027 }
1028
1029 pub fn start_transaction(&mut self, implicit: Option<usize>) -> Result<(), AdapterError> {
1035 let now = self.now_datetime();
1036 let session = self.session.as_mut().expect("session invariant violated");
1037 let result = match implicit {
1038 None => session.start_transaction(now, None, None),
1039 Some(stmts) => {
1040 session.start_transaction_implicit(now, stmts);
1041 Ok(())
1042 }
1043 };
1044 result
1045 }
1046
1047 #[instrument(level = "debug")]
1050 pub async fn end_transaction(
1051 &mut self,
1052 action: EndTransactionAction,
1053 ) -> Result<ExecuteResponse, AdapterError> {
1054 let res = self
1055 .send(|tx, session| Command::Commit {
1056 action,
1057 session,
1058 tx,
1059 })
1060 .await;
1061 let _ = self.session().clear_transaction();
1065 res
1066 }
1067
1068 pub fn fail_transaction(&mut self) {
1070 let session = self.session.take().expect("session invariant violated");
1071 let session = session.fail_transaction();
1072 self.session = Some(session);
1073 }
1074
1075 #[instrument(level = "debug")]
1077 pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
1078 let start = std::time::Instant::now();
1079 let CatalogSnapshot { catalog } = self
1080 .send_without_session(|tx| Command::CatalogSnapshot { tx })
1081 .await;
1082 self.inner()
1083 .metrics()
1084 .catalog_snapshot_seconds
1085 .with_label_values(&[context])
1086 .observe(start.elapsed().as_secs_f64());
1087 catalog
1088 }
1089
1090 pub async fn dump_catalog(&self) -> Result<CatalogDump, AdapterError> {
1095 let catalog = self.catalog_snapshot("dump_catalog").await;
1096 catalog.dump().map_err(AdapterError::from)
1097 }
1098
1099 pub async fn check_catalog(&self) -> Result<(), serde_json::Value> {
1105 let catalog = self.catalog_snapshot("check_catalog").await;
1106 catalog.check_consistency()
1107 }
1108
1109 pub async fn check_coordinator(&self) -> Result<(), serde_json::Value> {
1115 self.send_without_session(|tx| Command::CheckConsistency { tx })
1116 .await
1117 .map_err(|inconsistencies| {
1118 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1119 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1120 })
1121 })
1122 }
1123
1124 pub async fn dump_coordinator_state(&self) -> Result<serde_json::Value, anyhow::Error> {
1125 self.send_without_session(|tx| Command::Dump { tx }).await
1126 }
1127
1128 pub fn retire_execute(
1131 &self,
1132 guard: ExecuteContextGuard,
1133 reason: StatementEndedExecutionReason,
1134 ) {
1135 if !guard.is_trivial() {
1136 let data = guard.defuse();
1137 let cmd = Command::RetireExecute { data, reason };
1138 self.inner().send(cmd);
1139 }
1140 }
1141
1142 pub async fn start_copy_from_stdin(
1148 &mut self,
1149 target_id: CatalogItemId,
1150 target_name: String,
1151 columns: Vec<ColumnIndex>,
1152 row_desc: mz_repr::RelationDesc,
1153 params: mz_pgcopy::CopyFormatParams<'static>,
1154 ) -> Result<CopyFromStdinWriter, AdapterError> {
1155 self.send(|tx, session| Command::StartCopyFromStdin {
1156 target_id,
1157 target_name,
1158 columns,
1159 row_desc,
1160 params,
1161 session,
1162 tx,
1163 })
1164 .await
1165 }
1166
1167 pub fn stage_copy_from_stdin_batches(
1172 &mut self,
1173 target_id: CatalogItemId,
1174 batches: Vec<mz_persist_client::batch::ProtoBatch>,
1175 ) -> Result<(), AdapterError> {
1176 use crate::session::{TransactionOps, WriteOp};
1177 use mz_storage_client::client::TableData;
1178
1179 self.session()
1180 .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
1181 id: target_id,
1182 rows: TableData::Batches(batches.into()),
1183 }]))?;
1184 Ok(())
1185 }
1186
1187 pub async fn get_system_vars(&self) -> SystemVars {
1189 self.inner().get_system_vars().await
1190 }
1191
1192 pub async fn set_system_vars(
1194 &mut self,
1195 vars: BTreeMap<String, String>,
1196 ) -> Result<(), AdapterError> {
1197 let conn_id = self.session().conn_id().clone();
1198 self.send_without_session(|tx| Command::SetSystemVars { vars, conn_id, tx })
1199 .await
1200 }
1201
1202 pub async fn inject_audit_events(
1207 &mut self,
1208 events: Vec<crate::catalog::InjectedAuditEvent>,
1209 ) -> Result<(), AdapterError> {
1210 let conn_id = self.session().conn_id().clone();
1211 self.send_without_session(|tx| Command::InjectAuditEvents {
1212 events,
1213 conn_id,
1214 tx,
1215 })
1216 .await
1217 }
1218
1219 pub async fn terminate(&mut self) {
1221 let conn_id = self.session().conn_id().clone();
1222 let res = self
1223 .send_without_session(|tx| Command::Terminate {
1224 conn_id,
1225 tx: Some(tx),
1226 })
1227 .await;
1228 if let Err(e) = res {
1229 error!("Unable to terminate session: {e:?}");
1231 }
1232 self.inner = None;
1234 }
1235
1236 pub fn session(&mut self) -> &mut Session {
1238 self.session.as_mut().expect("session invariant violated")
1239 }
1240
1241 pub fn inner(&self) -> &Client {
1243 self.inner.as_ref().expect("inner invariant violated")
1244 }
1245
1246 async fn send_without_session<T, F>(&self, f: F) -> T
1247 where
1248 F: FnOnce(oneshot::Sender<T>) -> Command,
1249 {
1250 let (tx, rx) = oneshot::channel();
1251 self.inner().send(f(tx));
1252 rx.await.expect("sender dropped")
1253 }
1254
1255 #[instrument(level = "debug")]
1256 async fn send<T, F>(&mut self, f: F) -> Result<T, AdapterError>
1257 where
1258 F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1259 {
1260 self.send_with_cancel(f, futures::future::pending()).await
1261 }
1262
1263 #[instrument(level = "debug")]
1267 async fn send_with_cancel<T, F>(
1268 &mut self,
1269 f: F,
1270 cancel_future: impl Future<Output = std::io::Error> + Send,
1271 ) -> Result<T, AdapterError>
1272 where
1273 F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1274 {
1275 let session = self.session.take().expect("session invariant violated");
1276 let mut typ = None;
1277 let application_name = session.application_name();
1278 let name_hint = ApplicationNameHint::from_str(application_name);
1279 let conn_id = session.conn_id().clone();
1280 let (tx, rx) = oneshot::channel();
1281
1282 let Self {
1285 inner: inner_client,
1286 session: client_session,
1287 ..
1288 } = self;
1289
1290 let inner_client = inner_client.as_ref().expect("inner invariant violated");
1293
1294 let mut guarded_rx = rx.with_guard(|response: Response<_>| {
1300 *client_session = Some(response.session);
1301 });
1302
1303 inner_client.send({
1304 let cmd = f(tx, session);
1305 match cmd {
1309 Command::Execute { .. } => typ = Some("execute"),
1310 Command::GetWebhook { .. } => typ = Some("webhook"),
1311 Command::StartCopyFromStdin { .. }
1312 | Command::Startup { .. }
1313 | Command::AuthenticatePassword { .. }
1314 | Command::AuthenticateGetSASLChallenge { .. }
1315 | Command::AuthenticateVerifySASLProof { .. }
1316 | Command::CheckRoleCanLogin { .. }
1317 | Command::CatalogSnapshot { .. }
1318 | Command::Commit { .. }
1319 | Command::CancelRequest { .. }
1320 | Command::PrivilegedCancelRequest { .. }
1321 | Command::GetSystemVars { .. }
1322 | Command::SetSystemVars { .. }
1323 | Command::Terminate { .. }
1324 | Command::RetireExecute { .. }
1325 | Command::CheckConsistency { .. }
1326 | Command::Dump { .. }
1327 | Command::GetComputeInstanceClient { .. }
1328 | Command::GetOracle { .. }
1329 | Command::DetermineRealTimeRecentTimestamp { .. }
1330 | Command::GetTransactionReadHoldsBundle { .. }
1331 | Command::StoreTransactionReadHolds { .. }
1332 | Command::ExecuteSlowPathPeek { .. }
1333 | Command::ExecuteSubscribe { .. }
1334 | Command::CopyToPreflight { .. }
1335 | Command::ExecuteCopyTo { .. }
1336 | Command::ExecuteSideEffectingFunc { .. }
1337 | Command::RegisterFrontendPeek { .. }
1338 | Command::UnregisterFrontendPeek { .. }
1339 | Command::ExplainTimestamp { .. }
1340 | Command::FrontendStatementLogging(..)
1341 | Command::InjectAuditEvents { .. } => {}
1342 };
1343 cmd
1344 });
1345
1346 let mut cancel_future = pin::pin!(cancel_future);
1347 let mut cancelled = false;
1348 loop {
1349 tokio::select! {
1350 res = &mut guarded_rx => {
1351 drop(guarded_rx);
1353
1354 let res = res.expect("sender dropped");
1355 let status = res.result.is_ok().then_some("success").unwrap_or("error");
1356 if let Err(err) = res.result.as_ref() {
1357 if name_hint.should_trace_errors() {
1358 tracing::warn!(?err, ?name_hint, "adapter response error");
1359 }
1360 }
1361
1362 if let Some(typ) = typ {
1363 inner_client
1364 .metrics
1365 .commands
1366 .with_label_values(&[typ, status, name_hint.as_str()])
1367 .inc();
1368 }
1369 *client_session = Some(res.session);
1370 return res.result;
1371 },
1372 _err = &mut cancel_future, if !cancelled => {
1373 cancelled = true;
1374 inner_client.send(Command::PrivilegedCancelRequest {
1375 conn_id: conn_id.clone(),
1376 });
1377 }
1378 };
1379 }
1380 }
1381
1382 pub fn add_idle_in_transaction_session_timeout(&mut self) {
1383 let session = self.session();
1384 let timeout_dur = session.vars().idle_in_transaction_session_timeout();
1385 if !timeout_dur.is_zero() {
1386 let timeout_dur = timeout_dur.clone();
1387 if let Some(txn) = session.transaction().inner() {
1388 let txn_id = txn.id.clone();
1389 let timeout = TimeoutType::IdleInTransactionSession(txn_id);
1390 self.timeouts.add_timeout(timeout, timeout_dur);
1391 }
1392 }
1393 }
1394
1395 pub fn remove_idle_in_transaction_session_timeout(&mut self) {
1396 let session = self.session();
1397 if let Some(txn) = session.transaction().inner() {
1398 let txn_id = txn.id.clone();
1399 self.timeouts
1400 .remove_timeout(&TimeoutType::IdleInTransactionSession(txn_id));
1401 }
1402 }
1403
1404 pub async fn recv_timeout(&mut self) -> Option<TimeoutType> {
1411 self.timeouts.recv().await
1412 }
1413
1414 pub(crate) async fn try_frontend_peek(
1422 &mut self,
1423 portal_name: &str,
1424 catalog: Option<Arc<Catalog>>,
1425 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
1426 ) -> Result<Option<ExecuteResponse>, AdapterError> {
1427 if self.enable_frontend_peek_sequencing {
1428 let session = self.session.as_mut().expect("SessionClient invariant");
1429 self.peek_client
1430 .try_frontend_peek(portal_name, catalog, session, outer_ctx_extra)
1431 .await
1432 } else {
1433 Ok(None)
1434 }
1435 }
1436}
1437
1438impl Drop for SessionClient {
1439 fn drop(&mut self) {
1440 if let Some(session) = self.session.take() {
1444 if let Some(inner) = &self.inner {
1447 inner.send(Command::Terminate {
1448 conn_id: session.conn_id().clone(),
1449 tx: None,
1450 })
1451 }
1452 }
1453 }
1454}
1455
1456#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
1457pub enum TimeoutType {
1458 IdleInTransactionSession(TransactionId),
1459}
1460
1461impl Display for TimeoutType {
1462 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1463 match self {
1464 TimeoutType::IdleInTransactionSession(txn_id) => {
1465 writeln!(f, "Idle in transaction session for transaction '{txn_id}'")
1466 }
1467 }
1468 }
1469}
1470
1471impl From<TimeoutType> for AdapterError {
1472 fn from(timeout: TimeoutType) -> Self {
1473 match timeout {
1474 TimeoutType::IdleInTransactionSession(_) => {
1475 AdapterError::IdleInTransactionSessionTimeout
1476 }
1477 }
1478 }
1479}
1480
1481struct Timeout {
1482 tx: mpsc::UnboundedSender<TimeoutType>,
1483 rx: mpsc::UnboundedReceiver<TimeoutType>,
1484 active_timeouts: BTreeMap<TimeoutType, AbortOnDropHandle<()>>,
1485}
1486
1487impl Timeout {
1488 fn new() -> Self {
1489 let (tx, rx) = mpsc::unbounded_channel();
1490 Timeout {
1491 tx,
1492 rx,
1493 active_timeouts: BTreeMap::new(),
1494 }
1495 }
1496
1497 async fn recv(&mut self) -> Option<TimeoutType> {
1506 self.rx.recv().await
1507 }
1508
1509 fn add_timeout(&mut self, timeout: TimeoutType, duration: Duration) {
1510 let tx = self.tx.clone();
1511 let timeout_key = timeout.clone();
1512 let handle = mz_ore::task::spawn(|| format!("{timeout_key}"), async move {
1513 tokio::time::sleep(duration).await;
1514 let _ = tx.send(timeout);
1515 })
1516 .abort_on_drop();
1517 self.active_timeouts.insert(timeout_key, handle);
1518 }
1519
1520 fn remove_timeout(&mut self, timeout: &TimeoutType) {
1521 self.active_timeouts.remove(timeout);
1522
1523 let mut timeouts = Vec::new();
1525 while let Ok(pending_timeout) = self.rx.try_recv() {
1526 if timeout != &pending_timeout {
1527 timeouts.push(pending_timeout);
1528 }
1529 }
1530 for pending_timeout in timeouts {
1531 self.tx.send(pending_timeout).expect("rx is in this struct");
1532 }
1533 }
1534}
1535
1536#[derive(Derivative)]
1540#[derivative(Debug)]
1541pub struct RecordFirstRowStream {
1542 #[derivative(Debug = "ignore")]
1544 pub rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1545 pub execute_started: Instant,
1547 pub time_to_first_row_seconds: Histogram,
1550 pub saw_rows: bool,
1552 pub recorded_first_row_instant: Option<Instant>,
1554 pub no_more_rows: bool,
1556 pub metric_recorded: bool,
1558}
1559
1560impl RecordFirstRowStream {
1561 pub fn new(
1563 rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1564 execute_started: Instant,
1565 client: &SessionClient,
1566 instance_id: Option<ComputeInstanceId>,
1567 strategy: Option<StatementExecutionStrategy>,
1568 ) -> Self {
1569 let histogram = Self::histogram(client, instance_id, strategy);
1570 Self {
1571 rows,
1572 execute_started,
1573 time_to_first_row_seconds: histogram,
1574 saw_rows: false,
1575 recorded_first_row_instant: None,
1576 no_more_rows: false,
1577 metric_recorded: false,
1578 }
1579 }
1580
1581 fn histogram(
1582 client: &SessionClient,
1583 instance_id: Option<ComputeInstanceId>,
1584 strategy: Option<StatementExecutionStrategy>,
1585 ) -> Histogram {
1586 let isolation_level = *client
1587 .session
1588 .as_ref()
1589 .expect("session invariant")
1590 .vars()
1591 .transaction_isolation();
1592 let instance = match instance_id {
1593 Some(i) => Cow::Owned(i.to_string()),
1594 None => Cow::Borrowed("none"),
1595 };
1596 let strategy = match strategy {
1597 Some(s) => s.name(),
1598 None => "none",
1599 };
1600
1601 client
1602 .inner()
1603 .metrics()
1604 .time_to_first_row_seconds
1605 .with_label_values(&[instance.as_ref(), isolation_level.as_str(), strategy])
1606 }
1607
1608 pub fn record(
1611 execute_started: Instant,
1612 client: &SessionClient,
1613 instance_id: Option<ComputeInstanceId>,
1614 strategy: Option<StatementExecutionStrategy>,
1615 ) {
1616 Self::histogram(client, instance_id, strategy)
1617 .observe(execute_started.elapsed().as_secs_f64());
1618 }
1619
1620 pub async fn recv(&mut self) -> Option<PeekResponseUnary> {
1621 let msg = self.rows.next().await;
1622 if !self.saw_rows && matches!(msg, Some(PeekResponseUnary::Rows(_))) {
1623 self.saw_rows = true;
1624 self.time_to_first_row_seconds
1625 .observe(self.execute_started.elapsed().as_secs_f64());
1626 self.recorded_first_row_instant = Some(Instant::now());
1627 }
1628 if msg.is_none() {
1629 self.no_more_rows = true;
1630 }
1631 msg
1632 }
1633}