1use std::borrow::Cow;
11use std::collections::BTreeMap;
12use std::fmt::{Debug, Display, Formatter};
13use std::future::Future;
14use std::pin::{self, Pin};
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use anyhow::bail;
19use chrono::{DateTime, Utc};
20use derivative::Derivative;
21use futures::{Stream, StreamExt};
22use itertools::Itertools;
23use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
24use mz_auth::password::Password;
25use mz_auth::{Authenticated, AuthenticatorKind};
26use mz_build_info::BuildInfo;
27use mz_compute_types::ComputeInstanceId;
28use mz_ore::channel::OneshotReceiverExt;
29use mz_ore::collections::CollectionExt;
30use mz_ore::id_gen::{IdAllocator, IdAllocatorInnerBitSet, MAX_ORG_ID, org_id_conn_bits};
31use mz_ore::instrument;
32use mz_ore::now::{EpochMillis, NowFn, to_datetime};
33use mz_ore::str::StrExt;
34use mz_ore::task::AbortOnDropHandle;
35use mz_ore::thread::JoinOnDropHandle;
36use mz_ore::tracing::OpenTelemetryContext;
37use mz_repr::user::InternalUserMetadata;
38use mz_repr::{CatalogItemId, ColumnIndex, SqlScalarType};
39use mz_sql::ast::{Raw, Statement};
40use mz_sql::catalog::{EnvironmentId, SessionCatalog};
41use mz_sql::session::hint::ApplicationNameHint;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::user::SUPPORT_USER;
44use mz_sql::session::vars::{
45 CLUSTER, ENABLE_FRONTEND_PEEK_SEQUENCING, OwnedVarInput, SystemVars, Var,
46};
47use mz_sql_parser::parser::{ParserStatementError, StatementParseResult};
48use prometheus::Histogram;
49use serde_json::json;
50use tokio::sync::{mpsc, oneshot};
51use tracing::{debug, error};
52use uuid::Uuid;
53
54use crate::catalog::Catalog;
55use crate::command::{
56 CatalogDump, CatalogSnapshot, Command, CopyFromStdinWriter, ExecuteResponse, Response,
57 SASLChallengeResponse, SASLVerifyProofResponse, SuperuserAttribute,
58};
59use crate::config::{ScopedParameters, ScopedParametersScope, SystemParameterFrontend};
60use crate::coord::{Coordinator, ExecuteContextGuard};
61use crate::error::AdapterError;
62use crate::metrics::{self, Metrics};
63use crate::session::{
64 EndTransactionAction, PreparedStatement, Session, SessionConfig, StateRevision, TransactionId,
65};
66use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
67use crate::telemetry::{self, EventDetails, SegmentClientExt, StatementFailureType};
68use crate::webhook::AppendWebhookResponse;
69use crate::{AdapterNotice, AppendWebhookError, PeekClient, PeekResponseUnary, StartupResponse};
70
71pub struct Handle {
77 pub(crate) session_id: Uuid,
78 pub(crate) start_instant: Instant,
79 pub(crate) _thread: JoinOnDropHandle<()>,
80}
81
82impl Handle {
83 pub fn session_id(&self) -> Uuid {
89 self.session_id
90 }
91
92 pub fn start_instant(&self) -> Instant {
94 self.start_instant
95 }
96}
97
98#[derive(Debug, Clone)]
106pub struct Client {
107 build_info: &'static BuildInfo,
108 inner_cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
109 id_alloc: IdAllocator<IdAllocatorInnerBitSet>,
110 now: NowFn,
111 metrics: Metrics,
112 environment_id: EnvironmentId,
113 segment_client: Option<mz_segment::Client>,
114}
115
116impl Client {
117 pub(crate) fn new(
118 build_info: &'static BuildInfo,
119 cmd_tx: mpsc::UnboundedSender<(OpenTelemetryContext, Command)>,
120 metrics: Metrics,
121 now: NowFn,
122 environment_id: EnvironmentId,
123 segment_client: Option<mz_segment::Client>,
124 ) -> Client {
125 let env_lower = org_id_conn_bits(&environment_id.organization_id());
133 Client {
134 build_info,
135 inner_cmd_tx: cmd_tx,
136 id_alloc: IdAllocator::new(1, MAX_ORG_ID, env_lower),
137 now,
138 metrics,
139 environment_id,
140 segment_client,
141 }
142 }
143
144 pub fn new_conn_id(&self) -> Result<ConnectionId, AdapterError> {
146 self.id_alloc.alloc().ok_or(AdapterError::IdExhaustionError)
147 }
148
149 pub fn new_session(&self, config: SessionConfig, _authenticated: Authenticated) -> Session {
155 Session::new(self.build_info, config, self.metrics().session_metrics())
159 }
160
161 pub async fn authenticate(
165 &self,
166 user: &String,
167 password: &Password,
168 ) -> Result<Authenticated, AdapterError> {
169 let (tx, rx) = oneshot::channel();
170 self.send(Command::AuthenticatePassword {
171 role_name: user.to_string(),
172 password: Some(password.clone()),
173 tx,
174 });
175 rx.await.expect("sender dropped")?;
176 Ok(Authenticated)
177 }
178
179 pub async fn generate_sasl_challenge(
182 &self,
183 user: &String,
184 client_nonce: &String,
185 ) -> Result<SASLChallengeResponse, AdapterError> {
186 let (tx, rx) = oneshot::channel();
187 self.send(Command::AuthenticateGetSASLChallenge {
188 role_name: user.to_string(),
189 nonce: client_nonce.to_string(),
190 tx,
191 });
192 let response = rx.await.expect("sender dropped")?;
193 Ok(response)
194 }
195
196 pub async fn verify_sasl_proof(
199 &self,
200 user: &String,
201 proof: &String,
202 nonce: &String,
203 mock_hash: &String,
204 ) -> Result<(SASLVerifyProofResponse, Authenticated), AdapterError> {
205 let (tx, rx) = oneshot::channel();
206 self.send(Command::AuthenticateVerifySASLProof {
207 role_name: user.to_string(),
208 proof: proof.to_string(),
209 auth_message: nonce.to_string(),
210 mock_hash: mock_hash.to_string(),
211 tx,
212 });
213 let response = rx.await.expect("sender dropped")?;
214 Ok((response, Authenticated))
215 }
216
217 pub async fn role_can_login(&self, role_name: &str) -> Result<(), AdapterError> {
219 let (tx, rx) = oneshot::channel();
220 self.send(Command::CheckRoleCanLogin {
221 role_name: role_name.to_string(),
222 tx,
223 });
224 rx.await.expect("sender dropped")
225 }
226
227 #[mz_ore::instrument(level = "debug")]
236 pub async fn startup(&self, session: Session) -> Result<SessionClient, AdapterError> {
237 let user = session.user().clone();
238 let conn_id = session.conn_id().clone();
239 let secret_key = session.secret_key();
240 let uuid = session.uuid();
241 let client_ip = session.client_ip();
242 let application_name = session.application_name().into();
243 let notice_tx = session.retain_notice_transmitter();
244
245 let (tx, rx) = oneshot::channel();
246
247 let rx = rx.with_guard(|_| {
253 self.send(Command::Terminate {
254 conn_id: conn_id.clone(),
255 tx: None,
256 });
257 });
258
259 self.send(Command::Startup {
260 tx,
261 user,
262 conn_id: conn_id.clone(),
263 secret_key,
264 uuid,
265 client_ip: client_ip.copied(),
266 application_name,
267 notice_tx,
268 });
269
270 let response = rx.await.expect("sender dropped")?;
273
274 let StartupResponse {
278 role_id,
279 write_notify,
280 session_defaults,
281 catalog,
282 storage_collections,
283 transient_id_gen,
284 optimizer_metrics,
285 persist_client,
286 statement_logging_frontend,
287 superuser_attribute,
288 } = response;
289
290 let peek_client = PeekClient::new(
291 self.clone(),
292 storage_collections,
293 transient_id_gen,
294 optimizer_metrics,
295 persist_client,
296 statement_logging_frontend,
297 );
298
299 let mut client = SessionClient {
300 inner: Some(self.clone()),
301 session: Some(session),
302 timeouts: Timeout::new(),
303 environment_id: self.environment_id.clone(),
304 segment_client: self.segment_client.clone(),
305 peek_client,
306 enable_frontend_peek_sequencing: false, };
308
309 let session = client.session();
310
311 if let SuperuserAttribute(Some(superuser)) = superuser_attribute {
314 session.apply_internal_user_metadata(InternalUserMetadata { superuser });
315 }
316
317 session.initialize_role_metadata(role_id);
318 let vars_mut = session.vars_mut();
319 for (name, val) in session_defaults {
320 if let Err(err) = vars_mut.set_default(&name, val.borrow()) {
321 tracing::error!("failed to set peristed default, {err:?}");
324 }
325 }
326 session
327 .vars_mut()
328 .end_transaction(EndTransactionAction::Commit);
329
330 session.set_builtin_table_updates(write_notify);
338
339 let catalog = catalog.for_session(session);
340
341 let cluster_active = session.vars().cluster().to_string();
342 if session.vars().welcome_message() {
343 let cluster_info = if catalog.resolve_cluster(Some(&cluster_active)).is_err() {
344 format!("{cluster_active} (does not exist)")
345 } else {
346 cluster_active.to_string()
347 };
348
349 session.add_notice(AdapterNotice::Welcome(format!(
353 "connected to Materialize v{}
354 Environment ID: {}
355 Region: {}
356 User: {}
357 Cluster: {}
358 Database: {}
359 {}
360 Session UUID: {}
361
362Issue a SQL query to get started. Need help?
363 View documentation: https://materialize.com/s/docs
364 Join our Slack community: https://materialize.com/s/chat
365 ",
366 session.vars().build_info().semver_version(),
367 self.environment_id,
368 self.environment_id.region(),
369 session.vars().user().name,
370 cluster_info,
371 session.vars().database(),
372 match session.vars().search_path() {
373 [schema] => format!("Schema: {}", schema),
374 schemas => format!(
375 "Search path: {}",
376 schemas.iter().map(|id| id.to_string()).join(", ")
377 ),
378 },
379 session.uuid(),
380 )));
381 }
382
383 if session.vars().current_object_missing_warnings() {
384 if catalog.active_database().is_none() {
385 let db = session.vars().database().into();
386 session.add_notice(AdapterNotice::UnknownSessionDatabase(db));
387 }
388 }
389
390 let cluster_var = session
393 .vars()
394 .inspect(CLUSTER.name())
395 .expect("cluster should exist");
396 if session.vars().current_object_missing_warnings()
397 && catalog.resolve_cluster(Some(&cluster_active)).is_err()
398 {
399 let cluster_notice = 'notice: {
400 if cluster_var.inspect_session_value().is_some() {
401 break 'notice Some(AdapterNotice::DefaultClusterDoesNotExist {
402 name: cluster_active,
403 kind: "session",
404 suggested_action: "Pick an extant cluster with SET CLUSTER = name. Run SHOW CLUSTERS to see available clusters.".into(),
405 });
406 }
407
408 let role_default = catalog.get_role(catalog.active_role_id());
409 let role_cluster = match role_default.vars().get(CLUSTER.name()) {
410 Some(OwnedVarInput::Flat(name)) => Some(name),
411 None => None,
412 Some(v @ OwnedVarInput::SqlSet(_)) => {
414 tracing::warn!(?v, "SqlSet found for cluster Role Default");
415 break 'notice None;
416 }
417 };
418
419 let alter_role = "with `ALTER ROLE <role> SET cluster TO <cluster>;`";
420 match role_cluster {
421 None => Some(AdapterNotice::DefaultClusterDoesNotExist {
423 name: cluster_active,
424 kind: "system",
425 suggested_action: format!(
426 "Set a default cluster for the current role {alter_role}."
427 ),
428 }),
429 Some(_) => Some(AdapterNotice::DefaultClusterDoesNotExist {
431 name: cluster_active,
432 kind: "role",
433 suggested_action: format!(
434 "Change the default cluster for the current role {alter_role}."
435 ),
436 }),
437 }
438 };
439
440 if let Some(notice) = cluster_notice {
441 session.add_notice(notice);
442 }
443 }
444
445 client.enable_frontend_peek_sequencing = ENABLE_FRONTEND_PEEK_SEQUENCING
446 .require(catalog.system_vars())
447 .is_ok();
448
449 Ok(client)
450 }
451
452 pub fn cancel_request(&self, conn_id: ConnectionIdType, secret_key: u32) {
454 self.send(Command::CancelRequest {
455 conn_id,
456 secret_key,
457 });
458 }
459
460 pub async fn support_execute_one(
463 &self,
464 sql: &str,
465 ) -> Result<Pin<Box<dyn Stream<Item = PeekResponseUnary> + Send>>, anyhow::Error> {
466 let conn_id = self.new_conn_id()?;
468 let session = self.new_session(
469 SessionConfig {
470 conn_id,
471 uuid: Uuid::new_v4(),
472 user: SUPPORT_USER.name.clone(),
473 client_ip: None,
474 external_metadata_rx: None,
475 helm_chart_version: None,
476 authenticator_kind: AuthenticatorKind::None,
477 groups: None,
478 },
479 Authenticated,
480 );
481 let mut session_client = self.startup(session).await?;
482
483 let stmts = mz_sql::parse::parse(sql)?;
485 if stmts.len() != 1 {
486 bail!("must supply exactly one query");
487 }
488 let StatementParseResult { ast: stmt, sql } = stmts.into_element();
489
490 const EMPTY_PORTAL: &str = "";
491 session_client.start_transaction(Some(1))?;
492 session_client
493 .declare(EMPTY_PORTAL.into(), stmt, sql.to_string())
494 .await?;
495
496 let execute_result = session_client
497 .execute(EMPTY_PORTAL.into(), futures::future::pending(), None)
498 .await?;
499 match execute_result {
500 (ExecuteResponse::SendingRowsStreaming { mut rows, .. }, _) => {
501 let owning_response_stream = async_stream::stream! {
506 while let Some(rows) = rows.next().await {
507 yield rows;
508 }
509 drop(session_client);
510 };
511 Ok(Box::pin(owning_response_stream))
512 }
513 r => bail!("unsupported response type: {r:?}"),
514 }
515 }
516
517 pub fn metrics(&self) -> &Metrics {
519 &self.metrics
520 }
521
522 pub fn now(&self) -> DateTime<Utc> {
524 to_datetime((self.now)())
525 }
526
527 pub async fn get_webhook_appender(
529 &self,
530 database: String,
531 schema: String,
532 name: String,
533 ) -> Result<AppendWebhookResponse, AppendWebhookError> {
534 let (tx, rx) = oneshot::channel();
535
536 self.send(Command::GetWebhook {
538 database,
539 schema,
540 name,
541 tx,
542 });
543
544 let response = rx
546 .await
547 .map_err(|_| anyhow::anyhow!("failed to receive webhook response"))?;
548
549 response
550 }
551
552 pub async fn get_system_vars(&self) -> SystemVars {
554 let (tx, rx) = oneshot::channel();
555 self.send(Command::GetSystemVars { tx });
556 rx.await.expect("coordinator unexpectedly gone")
557 }
558
559 pub async fn catalog_snapshot(&self) -> Arc<Catalog> {
561 let (tx, rx) = oneshot::channel();
562 self.send(Command::CatalogSnapshot { tx });
563 let CatalogSnapshot { catalog } = rx.await.expect("coordinator unexpectedly gone");
564 catalog
565 }
566
567 pub async fn update_scoped_system_parameters(
577 &self,
578 overrides: ScopedParameters,
579 prune_scope: Option<ScopedParametersScope>,
580 ) {
581 let (tx, rx) = oneshot::channel();
582 self.send(Command::UpdateScopedSystemParameters {
583 overrides,
584 prune_scope,
585 tx,
586 });
587 let _ = rx.await;
588 }
589
590 pub fn install_scoped_system_parameter_frontend(&self, frontend: Arc<SystemParameterFrontend>) {
595 self.send(Command::InstallScopedSystemParameterFrontend { frontend });
596 }
597
598 #[instrument(level = "debug")]
599 pub(crate) fn send(&self, cmd: Command) {
600 self.inner_cmd_tx
601 .send((OpenTelemetryContext::obtain(), cmd))
602 .expect("coordinator unexpectedly gone");
603 }
604}
605
606pub struct SessionClient {
610 inner: Option<Client>,
614 session: Option<Session>,
617 timeouts: Timeout,
618 segment_client: Option<mz_segment::Client>,
619 environment_id: EnvironmentId,
620 peek_client: PeekClient,
622 pub enable_frontend_peek_sequencing: bool,
627}
628
629impl SessionClient {
630 pub fn parse<'a>(
633 &self,
634 sql: &'a str,
635 ) -> Result<Result<Vec<StatementParseResult<'a>>, ParserStatementError>, String> {
636 match mz_sql::parse::parse_with_limit(sql) {
637 Ok(Err(e)) => {
638 self.track_statement_parse_failure(&e);
639 Ok(Err(e))
640 }
641 r => r,
642 }
643 }
644
645 fn track_statement_parse_failure(&self, parse_error: &ParserStatementError) {
646 let session = self.session.as_ref().expect("session invariant violated");
647 let Some(user_id) = session.user().external_metadata.as_ref().map(|m| m.user_id) else {
648 return;
649 };
650 let Some(segment_client) = &self.segment_client else {
651 return;
652 };
653 let Some(statement_kind) = parse_error.statement else {
654 return;
655 };
656 let Some((action, object_type)) = telemetry::analyze_audited_statement(statement_kind)
657 else {
658 return;
659 };
660 let event_type = StatementFailureType::ParseFailure;
661 let event_name = format!(
662 "{} {} {}",
663 object_type.as_title_case(),
664 action.as_title_case(),
665 event_type.as_title_case(),
666 );
667 segment_client.environment_track(
668 &self.environment_id,
669 event_name,
670 json!({
671 "statement_kind": statement_kind,
672 "error": &parse_error.error,
673 }),
674 EventDetails {
675 user_id: Some(user_id),
676 application_name: Some(session.application_name()),
677 ..Default::default()
678 },
679 );
680 }
681
682 pub async fn get_prepared_statement(
685 &mut self,
686 name: &str,
687 ) -> Result<&PreparedStatement, AdapterError> {
688 let catalog = self.catalog_snapshot("get_prepared_statement").await;
689 Coordinator::verify_prepared_statement(&catalog, self.session(), name)?;
690 Ok(self
691 .session()
692 .get_prepared_statement_unverified(name)
693 .expect("must exist"))
694 }
695
696 pub async fn prepare(
701 &mut self,
702 name: String,
703 stmt: Option<Statement<Raw>>,
704 sql: String,
705 param_types: Vec<Option<SqlScalarType>>,
706 ) -> Result<(), AdapterError> {
707 let catalog = self.catalog_snapshot("prepare").await;
708
709 let mut async_pause = false;
712 (|| {
713 fail::fail_point!("async_prepare", |val| {
714 async_pause = val.map_or(false, |val| val.parse().unwrap_or(false))
715 });
716 })();
717 if async_pause {
718 tokio::time::sleep(Duration::from_secs(1)).await;
719 };
720
721 let desc = Coordinator::describe(&catalog, self.session(), stmt.clone(), param_types)?;
722 let now = self.now();
723 let state_revision = StateRevision {
724 catalog_revision: catalog.transient_revision(),
725 session_state_revision: self.session().state_revision(),
726 };
727 self.session()
728 .set_prepared_statement(name, stmt, sql, desc, state_revision, now);
729 Ok(())
730 }
731
732 #[mz_ore::instrument(level = "debug")]
734 pub async fn declare(
735 &mut self,
736 name: String,
737 stmt: Statement<Raw>,
738 sql: String,
739 ) -> Result<(), AdapterError> {
740 let catalog = self.catalog_snapshot("declare").await;
741 let param_types = vec![];
742 let desc =
743 Coordinator::describe(&catalog, self.session(), Some(stmt.clone()), param_types)?;
744 let params = vec![];
745 let result_formats = vec![mz_pgwire_common::Format::Text; desc.arity()];
746 let now = self.now();
747 let logging = self.session().mint_logging(sql, Some(&stmt), now);
748 let state_revision = StateRevision {
749 catalog_revision: catalog.transient_revision(),
750 session_state_revision: self.session().state_revision(),
751 };
752 self.session().set_portal(
753 name,
754 desc,
755 Some(stmt),
756 logging,
757 params,
758 result_formats,
759 state_revision,
760 )?;
761 Ok(())
762 }
763
764 #[mz_ore::instrument(level = "debug")]
771 pub async fn execute(
772 &mut self,
773 portal_name: String,
774 cancel_future: impl Future<Output = std::io::Error> + Send,
775 outer_ctx_extra: Option<ExecuteContextGuard>,
776 ) -> Result<(ExecuteResponse, Instant), AdapterError> {
777 let execute_started = Instant::now();
778
779 let mut outer_ctx_extra = outer_ctx_extra;
780
781 let (portal_name, catalog) = self
795 .unroll_sql_execute(portal_name, &mut outer_ctx_extra)
796 .await?;
797
798 let peek_result = self
802 .try_frontend_peek(&portal_name, catalog, &mut outer_ctx_extra)
803 .await?;
804 if let Some(resp) = peek_result {
805 debug!("frontend peek succeeded");
806 return Ok((resp, execute_started));
809 } else {
810 debug!("frontend peek did not happen, falling back to `Command::Execute`");
811 }
816
817 let response = self
818 .send_with_cancel(
819 |tx, session| Command::Execute {
820 portal_name,
821 session,
822 tx,
823 outer_ctx_extra,
824 },
825 cancel_future,
826 )
827 .await?;
828 Ok((response, execute_started))
829 }
830
831 async fn unroll_sql_execute(
847 &mut self,
848 portal_name: String,
849 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
850 ) -> Result<(String, Option<Arc<Catalog>>), AdapterError> {
851 let (stmt, params, outer_logging, outer_lifecycle_timestamps) = {
852 let session = self.session.as_ref().expect("SessionClient invariant");
853 let portal = match session.get_portal_unverified(&portal_name) {
854 Some(p) => p,
855 None => return Ok((portal_name, None)),
858 };
859 match &portal.stmt {
860 Some(stmt) => (
861 Arc::clone(stmt),
862 portal.parameters.clone(),
863 Arc::clone(&portal.logging),
864 portal.lifecycle_timestamps.clone(),
865 ),
866 None => return Ok((portal_name, None)),
867 }
868 };
869
870 if !matches!(&*stmt, Statement::Execute(_)) {
873 return Ok((portal_name, None));
874 }
875
876 let catalog = self.catalog_snapshot("unroll_sql_execute").await;
877
878 {
882 let session = self.session.as_mut().expect("SessionClient invariant");
883 Coordinator::verify_portal(&catalog, session, &portal_name)?;
884 }
885
886 {
890 let session = self.session.as_ref().expect("SessionClient invariant");
891 session
892 .metrics()
893 .query_total(&[
894 metrics::session_type_label_value(session.user()),
895 metrics::statement_type_label_value(&stmt),
896 ])
897 .inc();
898 }
899
900 let began_outer_logging = outer_ctx_extra.is_none();
910 let logging_id: Option<crate::statement_logging::StatementLoggingId> =
911 if began_outer_logging {
912 let session = self.session.as_mut().expect("SessionClient invariant");
913 let result = self
914 .peek_client
915 .statement_logging_frontend
916 .begin_statement_execution(
917 session,
918 ¶ms,
919 &outer_logging,
920 catalog.system_config(),
921 outer_lifecycle_timestamps,
922 );
923 if let Some((id, began_execution, mseh_update, prepared_statement)) = result {
924 self.peek_client.log_began_execution(
925 began_execution,
926 mseh_update,
927 prepared_statement,
928 );
929 Some(id)
930 } else {
931 None
932 }
933 } else {
934 None
935 };
936
937 let new_portal_name = match self.install_inner_portal_for_execute(&catalog, &stmt, ¶ms)
938 {
939 Ok(name) => name,
940 Err(err) => {
941 if let Some(id) = logging_id {
942 self.peek_client.log_ended_execution(
943 id,
944 StatementEndedExecutionReason::Errored {
945 error: err.to_string(),
946 },
947 );
948 }
949 return Err(err);
950 }
951 };
952
953 if began_outer_logging {
965 let (dummy_tx, _dummy_rx) = mpsc::unbounded_channel();
973 *outer_ctx_extra = Some(ExecuteContextGuard::new(logging_id, dummy_tx));
974 }
975
976 Ok((new_portal_name, Some(catalog)))
977 }
978
979 fn install_inner_portal_for_execute(
988 &mut self,
989 catalog: &Arc<Catalog>,
990 stmt: &Arc<Statement<Raw>>,
991 params: &mz_sql::plan::Params,
992 ) -> Result<String, AdapterError> {
993 use mz_sql::plan::Plan;
994
995 let execute_plan = {
996 let session = self.session.as_mut().expect("SessionClient invariant");
997 let conn_catalog = catalog.for_session(session);
998 let (resolved_stmt, resolved_ids) =
999 mz_sql::names::resolve(&conn_catalog, (**stmt).clone())?;
1000 let pcx = session.pcx();
1001 let (plan, _sql_impl_ids) = mz_sql::plan::plan(
1002 Some(pcx),
1003 &conn_catalog,
1004 resolved_stmt,
1005 params,
1006 &resolved_ids,
1007 )?;
1008 match plan {
1009 Plan::Execute(plan) => plan,
1010 other => {
1011 return Err(AdapterError::Internal(format!(
1015 "planning Statement::Execute yielded unexpected plan: {:?}",
1016 mz_sql::plan::PlanKind::from(&other),
1017 )));
1018 }
1019 }
1020 };
1021
1022 let session = self.session.as_mut().expect("SessionClient invariant");
1028 Coordinator::verify_prepared_statement(catalog, session, &execute_plan.name)?;
1029 let ps = session
1030 .get_prepared_statement_unverified(&execute_plan.name)
1031 .expect("verified above");
1032 let inner_stmt = ps.stmt().cloned();
1033 let inner_desc = ps.desc().clone();
1034 let state_revision = ps.state_revision;
1035 let inner_logging = Arc::clone(ps.logging());
1036
1037 if let Some(inner) = inner_stmt.as_ref() {
1042 if matches!(inner, Statement::Execute(_)) {
1043 return Err(AdapterError::Internal(format!(
1044 "nested EXECUTE: prepared statement {} resolves to another EXECUTE; \
1045 parser should reject `PREPARE ... AS EXECUTE ...`",
1046 execute_plan.name.quoted(),
1047 )));
1048 }
1049 }
1050
1051 session.create_new_portal(
1052 inner_stmt,
1053 inner_logging,
1054 inner_desc,
1055 execute_plan.params,
1056 Vec::new(),
1057 state_revision,
1058 )
1059 }
1060
1061 fn now(&self) -> EpochMillis {
1062 (self.inner().now)()
1063 }
1064
1065 fn now_datetime(&self) -> DateTime<Utc> {
1066 to_datetime(self.now())
1067 }
1068
1069 pub fn start_transaction(&mut self, implicit: Option<usize>) -> Result<(), AdapterError> {
1075 let now = self.now_datetime();
1076 let session = self.session.as_mut().expect("session invariant violated");
1077 let result = match implicit {
1078 None => session.start_transaction(now, None, None),
1079 Some(stmts) => {
1080 session.start_transaction_implicit(now, stmts);
1081 Ok(())
1082 }
1083 };
1084 result
1085 }
1086
1087 #[instrument(level = "debug")]
1090 pub async fn end_transaction(
1091 &mut self,
1092 action: EndTransactionAction,
1093 ) -> Result<ExecuteResponse, AdapterError> {
1094 let res = self
1095 .send(|tx, session| Command::Commit {
1096 action,
1097 session,
1098 tx,
1099 })
1100 .await;
1101 let _ = self.session().clear_transaction();
1105 res
1106 }
1107
1108 pub fn fail_transaction(&mut self) {
1110 let session = self.session.take().expect("session invariant violated");
1111 let session = session.fail_transaction();
1112 self.session = Some(session);
1113 }
1114
1115 #[instrument(level = "debug")]
1117 pub async fn catalog_snapshot(&self, context: &str) -> Arc<Catalog> {
1118 let start = std::time::Instant::now();
1119 let CatalogSnapshot { catalog } = self
1120 .send_without_session(|tx| Command::CatalogSnapshot { tx })
1121 .await;
1122 self.inner()
1123 .metrics()
1124 .catalog_snapshot_seconds
1125 .with_label_values(&[context])
1126 .observe(start.elapsed().as_secs_f64());
1127 catalog
1128 }
1129
1130 pub async fn dump_catalog(&self) -> Result<CatalogDump, AdapterError> {
1135 let catalog = self.catalog_snapshot("dump_catalog").await;
1136 catalog.dump().map_err(AdapterError::from)
1137 }
1138
1139 pub async fn check_catalog(&self) -> Result<(), serde_json::Value> {
1145 let catalog = self.catalog_snapshot("check_catalog").await;
1146 catalog.check_consistency()
1147 }
1148
1149 pub async fn check_coordinator(&self) -> Result<(), serde_json::Value> {
1155 self.send_without_session(|tx| Command::CheckConsistency { tx })
1156 .await
1157 .map_err(|inconsistencies| {
1158 serde_json::to_value(inconsistencies).unwrap_or_else(|_| {
1159 serde_json::Value::String("failed to serialize inconsistencies".to_string())
1160 })
1161 })
1162 }
1163
1164 pub async fn dump_coordinator_state(&self) -> Result<serde_json::Value, anyhow::Error> {
1165 self.send_without_session(|tx| Command::Dump { tx }).await
1166 }
1167
1168 pub fn retire_execute(
1171 &self,
1172 guard: ExecuteContextGuard,
1173 reason: StatementEndedExecutionReason,
1174 ) {
1175 if !guard.is_trivial() {
1176 let data = guard.defuse();
1177 let cmd = Command::RetireExecute { data, reason };
1178 self.inner().send(cmd);
1179 }
1180 }
1181
1182 pub async fn start_copy_from_stdin(
1188 &mut self,
1189 target_id: CatalogItemId,
1190 target_name: String,
1191 columns: Vec<ColumnIndex>,
1192 row_desc: mz_repr::RelationDesc,
1193 params: mz_pgcopy::CopyFormatParams<'static>,
1194 ) -> Result<CopyFromStdinWriter, AdapterError> {
1195 self.send(|tx, session| Command::StartCopyFromStdin {
1196 target_id,
1197 target_name,
1198 columns,
1199 row_desc,
1200 params,
1201 session,
1202 tx,
1203 })
1204 .await
1205 }
1206
1207 pub fn stage_copy_from_stdin_batches(
1212 &mut self,
1213 target_id: CatalogItemId,
1214 batches: Vec<mz_persist_client::batch::ProtoBatch>,
1215 ) -> Result<(), AdapterError> {
1216 use crate::session::{TransactionOps, WriteOp};
1217 use mz_storage_client::client::TableData;
1218
1219 self.session()
1220 .add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
1221 id: target_id,
1222 rows: TableData::Batches(batches.into()),
1223 }]))?;
1224 Ok(())
1225 }
1226
1227 pub async fn get_system_vars(&self) -> SystemVars {
1229 self.inner().get_system_vars().await
1230 }
1231
1232 pub async fn set_system_vars(
1234 &mut self,
1235 vars: BTreeMap<String, String>,
1236 ) -> Result<(), AdapterError> {
1237 let conn_id = self.session().conn_id().clone();
1238 self.send_without_session(|tx| Command::SetSystemVars { vars, conn_id, tx })
1239 .await
1240 }
1241
1242 pub async fn inject_audit_events(
1247 &mut self,
1248 events: Vec<crate::catalog::InjectedAuditEvent>,
1249 ) -> Result<(), AdapterError> {
1250 let conn_id = self.session().conn_id().clone();
1251 self.send_without_session(|tx| Command::InjectAuditEvents {
1252 events,
1253 conn_id,
1254 tx,
1255 })
1256 .await
1257 }
1258
1259 pub async fn terminate(&mut self) {
1261 let conn_id = self.session().conn_id().clone();
1262 let res = self
1263 .send_without_session(|tx| Command::Terminate {
1264 conn_id,
1265 tx: Some(tx),
1266 })
1267 .await;
1268 if let Err(e) = res {
1269 error!("Unable to terminate session: {e:?}");
1271 }
1272 self.inner = None;
1274 }
1275
1276 pub fn session(&mut self) -> &mut Session {
1278 self.session.as_mut().expect("session invariant violated")
1279 }
1280
1281 pub fn inner(&self) -> &Client {
1283 self.inner.as_ref().expect("inner invariant violated")
1284 }
1285
1286 async fn send_without_session<T, F>(&self, f: F) -> T
1287 where
1288 F: FnOnce(oneshot::Sender<T>) -> Command,
1289 {
1290 let (tx, rx) = oneshot::channel();
1291 self.inner().send(f(tx));
1292 rx.await.expect("sender dropped")
1293 }
1294
1295 #[instrument(level = "debug")]
1296 async fn send<T, F>(&mut self, f: F) -> Result<T, AdapterError>
1297 where
1298 F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1299 {
1300 self.send_with_cancel(f, futures::future::pending()).await
1301 }
1302
1303 #[instrument(level = "debug")]
1307 async fn send_with_cancel<T, F>(
1308 &mut self,
1309 f: F,
1310 cancel_future: impl Future<Output = std::io::Error> + Send,
1311 ) -> Result<T, AdapterError>
1312 where
1313 F: FnOnce(oneshot::Sender<Response<T>>, Session) -> Command,
1314 {
1315 let session = self.session.take().expect("session invariant violated");
1316 let mut typ = None;
1317 let application_name = session.application_name();
1318 let name_hint = ApplicationNameHint::from_str(application_name);
1319 let conn_id = session.conn_id().clone();
1320 let (tx, rx) = oneshot::channel();
1321
1322 let Self {
1325 inner: inner_client,
1326 session: client_session,
1327 ..
1328 } = self;
1329
1330 let inner_client = inner_client.as_ref().expect("inner invariant violated");
1333
1334 let mut guarded_rx = rx.with_guard(|response: Response<_>| {
1340 *client_session = Some(response.session);
1341 });
1342
1343 inner_client.send({
1344 let cmd = f(tx, session);
1345 match cmd {
1349 Command::Execute { .. } => typ = Some("execute"),
1350 Command::GetWebhook { .. } => typ = Some("webhook"),
1351 Command::StartCopyFromStdin { .. }
1352 | Command::Startup { .. }
1353 | Command::AuthenticatePassword { .. }
1354 | Command::AuthenticateGetSASLChallenge { .. }
1355 | Command::AuthenticateVerifySASLProof { .. }
1356 | Command::CheckRoleCanLogin { .. }
1357 | Command::CatalogSnapshot { .. }
1358 | Command::Commit { .. }
1359 | Command::CancelRequest { .. }
1360 | Command::PrivilegedCancelRequest { .. }
1361 | Command::GetSystemVars { .. }
1362 | Command::SetSystemVars { .. }
1363 | Command::UpdateScopedSystemParameters { .. }
1364 | Command::InstallScopedSystemParameterFrontend { .. }
1365 | Command::Terminate { .. }
1366 | Command::RetireExecute { .. }
1367 | Command::CheckConsistency { .. }
1368 | Command::Dump { .. }
1369 | Command::GetComputeInstanceClient { .. }
1370 | Command::GetOracle { .. }
1371 | Command::DetermineRealTimeRecentTimestamp { .. }
1372 | Command::GetTransactionReadHoldsBundle { .. }
1373 | Command::StoreTransactionReadHolds { .. }
1374 | Command::ExecuteSlowPathPeek { .. }
1375 | Command::ExecuteSubscribe { .. }
1376 | Command::CopyToPreflight { .. }
1377 | Command::ExecuteCopyTo { .. }
1378 | Command::ExecuteSideEffectingFunc { .. }
1379 | Command::RegisterFrontendPeek { .. }
1380 | Command::UnregisterFrontendPeek { .. }
1381 | Command::ExplainTimestamp { .. }
1382 | Command::FrontendStatementLogging(..)
1383 | Command::InjectAuditEvents { .. } => {}
1384 };
1385 cmd
1386 });
1387
1388 let mut cancel_future = pin::pin!(cancel_future);
1389 let mut cancelled = false;
1390 loop {
1391 tokio::select! {
1392 res = &mut guarded_rx => {
1393 drop(guarded_rx);
1395
1396 let res = res.expect("sender dropped");
1397 let status = res.result.is_ok().then_some("success").unwrap_or("error");
1398 if let Err(err) = res.result.as_ref() {
1399 if name_hint.should_trace_errors() {
1400 tracing::warn!(?err, ?name_hint, "adapter response error");
1401 }
1402 }
1403
1404 if let Some(typ) = typ {
1405 inner_client
1406 .metrics
1407 .commands
1408 .with_label_values(&[typ, status, name_hint.as_str()])
1409 .inc();
1410 }
1411 *client_session = Some(res.session);
1412 return res.result;
1413 },
1414 _err = &mut cancel_future, if !cancelled => {
1415 cancelled = true;
1416 inner_client.send(Command::PrivilegedCancelRequest {
1417 conn_id: conn_id.clone(),
1418 });
1419 }
1420 };
1421 }
1422 }
1423
1424 pub fn add_idle_in_transaction_session_timeout(&mut self) {
1425 let session = self.session();
1426 let timeout_dur = session.vars().idle_in_transaction_session_timeout();
1427 if !timeout_dur.is_zero() {
1428 let timeout_dur = timeout_dur.clone();
1429 if let Some(txn) = session.transaction().inner() {
1430 let txn_id = txn.id.clone();
1431 let timeout = TimeoutType::IdleInTransactionSession(txn_id);
1432 self.timeouts.add_timeout(timeout, timeout_dur);
1433 }
1434 }
1435 }
1436
1437 pub fn remove_idle_in_transaction_session_timeout(&mut self) {
1438 let session = self.session();
1439 if let Some(txn) = session.transaction().inner() {
1440 let txn_id = txn.id.clone();
1441 self.timeouts
1442 .remove_timeout(&TimeoutType::IdleInTransactionSession(txn_id));
1443 }
1444 }
1445
1446 pub async fn recv_timeout(&mut self) -> Option<TimeoutType> {
1453 self.timeouts.recv().await
1454 }
1455
1456 pub(crate) async fn try_frontend_peek(
1464 &mut self,
1465 portal_name: &str,
1466 catalog: Option<Arc<Catalog>>,
1467 outer_ctx_extra: &mut Option<ExecuteContextGuard>,
1468 ) -> Result<Option<ExecuteResponse>, AdapterError> {
1469 if self.enable_frontend_peek_sequencing {
1470 let session = self.session.as_mut().expect("SessionClient invariant");
1471 self.peek_client
1472 .try_frontend_peek(portal_name, catalog, session, outer_ctx_extra)
1473 .await
1474 } else {
1475 Ok(None)
1476 }
1477 }
1478}
1479
1480impl Drop for SessionClient {
1481 fn drop(&mut self) {
1482 if let Some(session) = self.session.take() {
1486 if let Some(inner) = &self.inner {
1489 inner.send(Command::Terminate {
1490 conn_id: session.conn_id().clone(),
1491 tx: None,
1492 })
1493 }
1494 }
1495 }
1496}
1497
1498#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug)]
1499pub enum TimeoutType {
1500 IdleInTransactionSession(TransactionId),
1501}
1502
1503impl Display for TimeoutType {
1504 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1505 match self {
1506 TimeoutType::IdleInTransactionSession(txn_id) => {
1507 writeln!(f, "Idle in transaction session for transaction '{txn_id}'")
1508 }
1509 }
1510 }
1511}
1512
1513impl From<TimeoutType> for AdapterError {
1514 fn from(timeout: TimeoutType) -> Self {
1515 match timeout {
1516 TimeoutType::IdleInTransactionSession(_) => {
1517 AdapterError::IdleInTransactionSessionTimeout
1518 }
1519 }
1520 }
1521}
1522
1523struct Timeout {
1524 tx: mpsc::UnboundedSender<TimeoutType>,
1525 rx: mpsc::UnboundedReceiver<TimeoutType>,
1526 active_timeouts: BTreeMap<TimeoutType, AbortOnDropHandle<()>>,
1527}
1528
1529impl Timeout {
1530 fn new() -> Self {
1531 let (tx, rx) = mpsc::unbounded_channel();
1532 Timeout {
1533 tx,
1534 rx,
1535 active_timeouts: BTreeMap::new(),
1536 }
1537 }
1538
1539 async fn recv(&mut self) -> Option<TimeoutType> {
1548 self.rx.recv().await
1549 }
1550
1551 fn add_timeout(&mut self, timeout: TimeoutType, duration: Duration) {
1552 let tx = self.tx.clone();
1553 let timeout_key = timeout.clone();
1554 let handle = mz_ore::task::spawn(|| format!("{timeout_key}"), async move {
1555 tokio::time::sleep(duration).await;
1556 let _ = tx.send(timeout);
1557 })
1558 .abort_on_drop();
1559 self.active_timeouts.insert(timeout_key, handle);
1560 }
1561
1562 fn remove_timeout(&mut self, timeout: &TimeoutType) {
1563 self.active_timeouts.remove(timeout);
1564
1565 let mut timeouts = Vec::new();
1567 while let Ok(pending_timeout) = self.rx.try_recv() {
1568 if timeout != &pending_timeout {
1569 timeouts.push(pending_timeout);
1570 }
1571 }
1572 for pending_timeout in timeouts {
1573 self.tx.send(pending_timeout).expect("rx is in this struct");
1574 }
1575 }
1576}
1577
1578#[derive(Derivative)]
1582#[derivative(Debug)]
1583pub struct RecordFirstRowStream {
1584 #[derivative(Debug = "ignore")]
1586 pub rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1587 pub execute_started: Instant,
1589 pub time_to_first_row_seconds: Histogram,
1592 pub saw_rows: bool,
1594 pub recorded_first_row_instant: Option<Instant>,
1596 pub no_more_rows: bool,
1598 pub metric_recorded: bool,
1600}
1601
1602impl RecordFirstRowStream {
1603 pub fn new(
1605 rows: Box<dyn Stream<Item = PeekResponseUnary> + Unpin + Send + Sync>,
1606 execute_started: Instant,
1607 client: &SessionClient,
1608 instance_id: Option<ComputeInstanceId>,
1609 strategy: Option<StatementExecutionStrategy>,
1610 ) -> Self {
1611 let histogram = Self::histogram(client, instance_id, strategy);
1612 Self {
1613 rows,
1614 execute_started,
1615 time_to_first_row_seconds: histogram,
1616 saw_rows: false,
1617 recorded_first_row_instant: None,
1618 no_more_rows: false,
1619 metric_recorded: false,
1620 }
1621 }
1622
1623 fn histogram(
1624 client: &SessionClient,
1625 instance_id: Option<ComputeInstanceId>,
1626 strategy: Option<StatementExecutionStrategy>,
1627 ) -> Histogram {
1628 let session = client.session.as_ref().expect("session invariant");
1629 let isolation_level = *session.vars().transaction_isolation();
1630 let name_hint = ApplicationNameHint::from_str(session.application_name());
1631 let instance = match instance_id {
1632 Some(i) => Cow::Owned(i.to_string()),
1633 None => Cow::Borrowed("none"),
1634 };
1635 let strategy = match strategy {
1636 Some(s) => s.name(),
1637 None => "none",
1638 };
1639
1640 client
1641 .inner()
1642 .metrics()
1643 .time_to_first_row_seconds
1644 .with_label_values(&[
1645 instance.as_ref(),
1646 isolation_level.as_variant_str(),
1647 strategy,
1648 name_hint.as_str(),
1649 ])
1650 }
1651
1652 pub fn record(
1655 execute_started: Instant,
1656 client: &SessionClient,
1657 instance_id: Option<ComputeInstanceId>,
1658 strategy: Option<StatementExecutionStrategy>,
1659 ) {
1660 Self::histogram(client, instance_id, strategy)
1661 .observe(execute_started.elapsed().as_secs_f64());
1662 }
1663
1664 pub async fn recv(&mut self) -> Option<PeekResponseUnary> {
1665 let msg = self.rows.next().await;
1666 if !self.saw_rows && matches!(msg, Some(PeekResponseUnary::Rows(_))) {
1667 self.saw_rows = true;
1668 self.time_to_first_row_seconds
1669 .observe(self.execute_started.elapsed().as_secs_f64());
1670 self.recorded_first_row_instant = Some(Instant::now());
1671 }
1672 if msg.is_none() {
1673 self.no_more_rows = true;
1674 }
1675 msg
1676 }
1677}