mz_adapter/coord/
statement_logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use bytes::BytesMut;
14use mz_controller_types::ClusterId;
15use mz_ore::now::{NowFn, epoch_to_uuid_v7, to_datetime};
16use mz_ore::task::spawn;
17use mz_ore::{cast::CastFrom, cast::CastInto, now::EpochMillis};
18use mz_repr::adt::array::ArrayDimension;
19use mz_repr::adt::timestamp::TimestampLike;
20use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker, Timestamp};
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::ast::{AstInfo, Statement};
23use mz_sql::plan::Params;
24use mz_sql::session::metadata::SessionMetadata;
25use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
26use mz_storage_client::controller::IntrospectionType;
27use qcell::QCell;
28use rand::SeedableRng;
29use rand::{distributions::Bernoulli, prelude::Distribution, thread_rng};
30use sha2::{Digest, Sha256};
31use tokio::time::MissedTickBehavior;
32use tracing::debug;
33use uuid::Uuid;
34
35use crate::coord::{ConnMeta, Coordinator};
36use crate::session::{LifecycleTimestamps, Session};
37use crate::statement_logging::{
38    SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
39    StatementEndedExecutionRecord, StatementLifecycleEvent, StatementPreparedRecord,
40};
41
42use super::Message;
43
44/// Metadata required for logging a prepared statement.
45#[derive(Debug)]
46pub enum PreparedStatementLoggingInfo {
47    /// The statement has already been logged; we don't need to log it
48    /// again if a future execution hits the sampling rate; we merely
49    /// need to reference the corresponding UUID.
50    AlreadyLogged { uuid: Uuid },
51    /// The statement has not yet been logged; if a future execution
52    /// hits the sampling rate, we need to log it at that point.
53    StillToLog {
54        /// The SQL text of the statement.
55        sql: String,
56        /// The SQL text of the statement, redacted to follow our data management
57        /// policy
58        redacted_sql: String,
59        /// When the statement was prepared
60        prepared_at: EpochMillis,
61        /// The name with which the statement was prepared
62        name: String,
63        /// The ID of the session that prepared the statement
64        session_id: Uuid,
65        /// Whether we have already recorded this in the "would have logged" metric
66        accounted: bool,
67        /// The top-level kind of the statement (e.g., `Select`), or `None` for an empty statement
68        kind: Option<StatementKind>,
69
70        /// Private type that forces use of the [`PreparedStatementLoggingInfo::still_to_log`]
71        /// constructor.
72        _sealed: sealed::Private,
73    },
74}
75
76impl PreparedStatementLoggingInfo {
77    /// Constructor for the [`PreparedStatementLoggingInfo::StillToLog`] variant that ensures SQL
78    /// statements are properly redacted.
79    pub fn still_to_log<A: AstInfo>(
80        raw_sql: String,
81        stmt: Option<&Statement<A>>,
82        prepared_at: EpochMillis,
83        name: String,
84        session_id: Uuid,
85        accounted: bool,
86    ) -> Self {
87        let kind = stmt.map(StatementKind::from);
88        let sql = match kind {
89            // Always redact SQL statements that may contain sensitive information.
90            // CREATE SECRET and ALTER SECRET statements can contain secret values, so we redact them.
91            // INSERT, UPDATE, and EXECUTE statements can include large amounts of user data, so we redact them for both
92            // data privacy and to avoid logging excessive data.
93            Some(
94                StatementKind::CreateSecret
95                | StatementKind::AlterSecret
96                | StatementKind::Insert
97                | StatementKind::Update
98                | StatementKind::Execute,
99            ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
100            _ => raw_sql,
101        };
102
103        PreparedStatementLoggingInfo::StillToLog {
104            sql,
105            redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
106            prepared_at,
107            name,
108            session_id,
109            accounted,
110            kind,
111            _sealed: sealed::Private,
112        }
113    }
114}
115
116#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
117pub struct StatementLoggingId(Uuid);
118
119#[derive(Debug)]
120pub(crate) struct PreparedStatementEvent {
121    prepared_statement: Row,
122    sql_text: Row,
123}
124
125#[derive(Debug)]
126pub(crate) struct StatementLogging {
127    /// Information about statement executions that have been logged
128    /// but not finished.
129    ///
130    /// This map needs to have enough state left over to later retract
131    /// the system table entries (so that we can update them when the
132    /// execution finished.)
133    executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
134
135    /// Information about sessions that have been started, but which
136    /// have not yet been logged in `mz_session_history`.
137    /// They may be logged as part of a statement being executed (and chosen for logging).
138    unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
139
140    /// A reproducible RNG for deciding whether to sample statement executions.
141    /// Only used by tests; otherwise, `rand::thread_rng()` is used.
142    /// Controlled by the system var `statement_logging_use_reproducible_rng`.
143    reproducible_rng: rand_chacha::ChaCha8Rng,
144
145    pending_statement_execution_events: Vec<(Row, Diff)>,
146    pending_prepared_statement_events: Vec<PreparedStatementEvent>,
147    pending_session_events: Vec<Row>,
148    pending_statement_lifecycle_events: Vec<Row>,
149
150    now: NowFn,
151
152    /// The number of bytes that we are allowed to emit for statement logging without being throttled.
153    /// Increases at a rate of [`mz_sql::session::vars::STATEMENT_LOGGING_TARGET_DATA_RATE`] per second,
154    /// up to a max value of [`mz_sql::session::vars::STATEMENT_LOGGING_MAX_DATA_CREDIT`].
155    tokens: u64,
156    /// The last time at which a statement was logged.
157    last_logged_ts_seconds: u64,
158    /// The number of statements that have been throttled since the last successfully logged statement.
159    throttled_count: usize,
160}
161
162impl StatementLogging {
163    pub(crate) fn new(now: NowFn) -> Self {
164        let last_logged_ts_seconds = (now)() / 1000;
165        Self {
166            executions_begun: BTreeMap::new(),
167            unlogged_sessions: BTreeMap::new(),
168            reproducible_rng: rand_chacha::ChaCha8Rng::seed_from_u64(42),
169            pending_statement_execution_events: Vec::new(),
170            pending_prepared_statement_events: Vec::new(),
171            pending_session_events: Vec::new(),
172            pending_statement_lifecycle_events: Vec::new(),
173            tokens: 0,
174            last_logged_ts_seconds,
175            now: now.clone(),
176            throttled_count: 0,
177        }
178    }
179
180    /// Check if we need to drop a statement
181    /// due to throttling, and update internal data structures appropriately.
182    ///
183    /// Returns `None` if we must throttle this statement, and `Some(n)` otherwise, where `n`
184    /// is the number of statements that were dropped due to throttling before this one.
185    fn throttling_check(
186        &mut self,
187        cost: u64,
188        target_data_rate: u64,
189        max_data_credit: Option<u64>,
190    ) -> Option<usize> {
191        let ts = (self.now)() / 1000;
192        // We use saturating_sub here because system time isn't monotonic, causing cases
193        // when last_logged_ts_seconds is greater than ts.
194        let elapsed = ts.saturating_sub(self.last_logged_ts_seconds);
195        self.last_logged_ts_seconds = ts;
196        self.tokens = self
197            .tokens
198            .saturating_add(target_data_rate.saturating_mul(elapsed));
199        if let Some(max_data_credit) = max_data_credit {
200            self.tokens = self.tokens.min(max_data_credit);
201        }
202        if let Some(remaining) = self.tokens.checked_sub(cost) {
203            debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
204            self.tokens = remaining;
205            Some(std::mem::take(&mut self.throttled_count))
206        } else {
207            debug!(
208                "throttling check failed. tokens available: {}; cost: {cost}",
209                self.tokens
210            );
211            self.throttled_count += 1;
212            None
213        }
214    }
215}
216
217impl Coordinator {
218    pub(crate) fn spawn_statement_logging_task(&self) {
219        let internal_cmd_tx = self.internal_cmd_tx.clone();
220        spawn(|| "statement_logging", async move {
221            // TODO[btv] make this configurable via LD?
222            // Although... Logging every 5 seconds seems like it
223            // should have acceptable cost for now, since we do a
224            // group commit for tables every 1s anyway.
225            let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
226            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
227            loop {
228                interval.tick().await;
229                let _ = internal_cmd_tx.send(Message::DrainStatementLog);
230            }
231        });
232    }
233
234    #[mz_ore::instrument(level = "debug")]
235    pub(crate) fn drain_statement_log(&mut self) {
236        let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
237            .into_iter()
238            .map(|update| (update, Diff::ONE))
239            .collect();
240        let (prepared_statement_updates, sql_text_updates) =
241            std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
242                .into_iter()
243                .map(
244                    |PreparedStatementEvent {
245                         prepared_statement,
246                         sql_text,
247                     }| {
248                        ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
249                    },
250                )
251                .unzip::<_, _, Vec<_>, Vec<_>>();
252        let statement_execution_updates =
253            std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
254        let statement_lifecycle_updates =
255            std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
256                .into_iter()
257                .map(|update| (update, Diff::ONE))
258                .collect();
259
260        use IntrospectionType::*;
261        for (type_, updates) in [
262            (SessionHistory, session_updates),
263            (PreparedStatementHistory, prepared_statement_updates),
264            (StatementExecutionHistory, statement_execution_updates),
265            (StatementLifecycleHistory, statement_lifecycle_updates),
266            (SqlText, sql_text_updates),
267        ] {
268            if !updates.is_empty() && !self.controller.read_only() {
269                self.controller
270                    .storage
271                    .append_introspection_updates(type_, updates);
272            }
273        }
274    }
275
276    /// Check whether we need to do throttling (i.e., whether `STATEMENT_LOGGING_TARGET_DATA_RATE` is set).
277    /// If so, actually do the check.
278    ///
279    /// Returns `None` if we must throttle this statement, and `Some(n)` otherwise, where `n`
280    /// is the number of statements that were dropped due to throttling before this one.
281    fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize> {
282        let Some(target_data_rate) = self
283            .catalog
284            .system_config()
285            .statement_logging_target_data_rate()
286        else {
287            return Some(std::mem::take(&mut self.statement_logging.throttled_count));
288        };
289        let max_data_credit = self
290            .catalog
291            .system_config()
292            .statement_logging_max_data_credit();
293        self.statement_logging.throttling_check(
294            cost.cast_into(),
295            target_data_rate.cast_into(),
296            max_data_credit.map(CastInto::cast_into),
297        )
298    }
299
300    /// Returns any statement logging events needed for a particular
301    /// prepared statement. Possibly mutates the `PreparedStatementLoggingInfo` metadata.
302    ///
303    /// This function does not do a sampling check, and assumes we did so in a higher layer.
304    ///
305    /// It _does_ do a throttling check, and returns `None` if we must not log due to throttling.
306    pub(crate) fn log_prepared_statement(
307        &mut self,
308        session: &mut Session,
309        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
310    ) -> Option<(
311        Option<(StatementPreparedRecord, PreparedStatementEvent)>,
312        Uuid,
313    )> {
314        let logging = session.qcell_rw(&*logging);
315        let mut out = None;
316
317        let uuid = match logging {
318            PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
319            PreparedStatementLoggingInfo::StillToLog {
320                sql,
321                redacted_sql,
322                prepared_at,
323                name,
324                session_id,
325                accounted,
326                kind,
327                _sealed: _,
328            } => {
329                assert!(
330                    *accounted,
331                    "accounting for logging should be done in `begin_statement_execution`"
332                );
333                let uuid = epoch_to_uuid_v7(prepared_at);
334                let sql = std::mem::take(sql);
335                let redacted_sql = std::mem::take(redacted_sql);
336                let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
337                let record = StatementPreparedRecord {
338                    id: uuid,
339                    sql_hash,
340                    name: std::mem::take(name),
341                    session_id: *session_id,
342                    prepared_at: *prepared_at,
343                    kind: *kind,
344                };
345                let mut mpsh_row = Row::default();
346                let mut mpsh_packer = mpsh_row.packer();
347                Self::pack_statement_prepared_update(&record, &mut mpsh_packer);
348                let sql_row = Row::pack([
349                    Datum::TimestampTz(
350                        to_datetime(*prepared_at)
351                            .truncate_day()
352                            .try_into()
353                            .expect("must fit"),
354                    ),
355                    Datum::Bytes(sql_hash.as_slice()),
356                    Datum::String(sql.as_str()),
357                    Datum::String(redacted_sql.as_str()),
358                ]);
359
360                let cost = mpsh_packer.byte_len() + sql_row.byte_len();
361                let throttled_count = self.statement_logging_throttling_check(cost)?;
362                mpsh_packer.push(Datum::UInt64(throttled_count.try_into().expect("must fit")));
363                out = Some((
364                    record,
365                    PreparedStatementEvent {
366                        prepared_statement: mpsh_row,
367                        sql_text: sql_row,
368                    },
369                ));
370
371                *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
372                uuid
373            }
374        };
375        Some((out, uuid))
376    }
377    /// The rate at which statement execution should be sampled.
378    /// This is the value of the session var `statement_logging_sample_rate`,
379    /// constrained by the system var `statement_logging_max_sample_rate`.
380    pub fn statement_execution_sample_rate(&self, session: &Session) -> f64 {
381        let system: f64 = self
382            .catalog()
383            .system_config()
384            .statement_logging_max_sample_rate()
385            .try_into()
386            .expect("value constrained to be convertible to f64");
387        let user: f64 = session
388            .vars()
389            .get_statement_logging_sample_rate()
390            .try_into()
391            .expect("value constrained to be convertible to f64");
392        f64::min(system, user)
393    }
394
395    /// Record the end of statement execution for a statement whose beginning was logged.
396    /// It is an error to call this function for a statement whose beginning was not logged
397    /// (because it was not sampled). Requiring the opaque `StatementLoggingId` type,
398    /// which is only instantiated by `begin_statement_execution` if the statement is actually logged,
399    /// should prevent this.
400    pub fn end_statement_execution(
401        &mut self,
402        id: StatementLoggingId,
403        reason: StatementEndedExecutionReason,
404    ) {
405        let StatementLoggingId(uuid) = id;
406        let now = self.now();
407        let ended_record = StatementEndedExecutionRecord {
408            id: uuid,
409            reason,
410            ended_at: now,
411        };
412
413        let began_record = self
414            .statement_logging
415            .executions_begun
416            .remove(&uuid)
417            .expect(
418                "matched `begin_statement_execution` and `end_statement_execution` invocations",
419            );
420        for (row, diff) in
421            Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
422        {
423            self.statement_logging
424                .pending_statement_execution_events
425                .push((row, diff));
426        }
427        self.record_statement_lifecycle_event(
428            &id,
429            &StatementLifecycleEvent::ExecutionFinished,
430            now,
431        );
432    }
433
434    fn pack_statement_execution_inner(
435        record: &StatementBeganExecutionRecord,
436        packer: &mut RowPacker,
437    ) {
438        let StatementBeganExecutionRecord {
439            id,
440            prepared_statement_id,
441            sample_rate,
442            params,
443            began_at,
444            cluster_id,
445            cluster_name,
446            database_name,
447            search_path,
448            application_name,
449            transaction_isolation,
450            execution_timestamp,
451            transaction_id,
452            transient_index_id,
453            mz_version,
454        } = record;
455
456        let cluster = cluster_id.map(|id| id.to_string());
457        let transient_index_id = transient_index_id.map(|id| id.to_string());
458        packer.extend([
459            Datum::Uuid(*id),
460            Datum::Uuid(*prepared_statement_id),
461            Datum::Float64((*sample_rate).into()),
462            match &cluster {
463                None => Datum::Null,
464                Some(cluster_id) => Datum::String(cluster_id),
465            },
466            Datum::String(&*application_name),
467            cluster_name.as_ref().map(String::as_str).into(),
468            Datum::String(database_name),
469        ]);
470        packer.push_list(search_path.iter().map(|s| Datum::String(s)));
471        packer.extend([
472            Datum::String(&*transaction_isolation),
473            (*execution_timestamp).into(),
474            Datum::UInt64(*transaction_id),
475            match &transient_index_id {
476                None => Datum::Null,
477                Some(transient_index_id) => Datum::String(transient_index_id),
478            },
479        ]);
480        packer
481            .try_push_array(
482                &[ArrayDimension {
483                    lower_bound: 1,
484                    length: params.len(),
485                }],
486                params
487                    .iter()
488                    .map(|p| Datum::from(p.as_ref().map(String::as_str))),
489            )
490            .expect("correct array dimensions");
491        packer.push(Datum::from(mz_version.as_str()));
492        packer.push(Datum::TimestampTz(
493            to_datetime(*began_at).try_into().expect("Sane system time"),
494        ));
495    }
496
497    fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
498        let mut row = Row::default();
499        let mut packer = row.packer();
500        Self::pack_statement_execution_inner(record, &mut packer);
501        packer.extend([
502            // finished_at
503            Datum::Null,
504            // finished_status
505            Datum::Null,
506            // error_message
507            Datum::Null,
508            // result_size
509            Datum::Null,
510            // rows_returned
511            Datum::Null,
512            // execution_status
513            Datum::Null,
514        ]);
515        row
516    }
517
518    fn pack_statement_prepared_update(record: &StatementPreparedRecord, packer: &mut RowPacker) {
519        let StatementPreparedRecord {
520            id,
521            session_id,
522            name,
523            sql_hash,
524            prepared_at,
525            kind,
526        } = record;
527        packer.extend([
528            Datum::Uuid(*id),
529            Datum::Uuid(*session_id),
530            Datum::String(name.as_str()),
531            Datum::Bytes(sql_hash.as_slice()),
532            Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
533            kind.map(statement_kind_label_value).into(),
534        ]);
535    }
536
537    fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
538        let SessionHistoryEvent {
539            id,
540            connected_at,
541            application_name,
542            authenticated_user,
543        } = event;
544        Row::pack_slice(&[
545            Datum::Uuid(*id),
546            Datum::TimestampTz(
547                mz_ore::now::to_datetime(*connected_at)
548                    .try_into()
549                    .expect("must fit"),
550            ),
551            Datum::String(&*application_name),
552            Datum::String(&*authenticated_user),
553        ])
554    }
555
556    fn pack_statement_lifecycle_event(
557        StatementLoggingId(uuid): &StatementLoggingId,
558        event: &StatementLifecycleEvent,
559        when: EpochMillis,
560    ) -> Row {
561        Row::pack_slice(&[
562            Datum::Uuid(*uuid),
563            Datum::String(event.as_str()),
564            Datum::TimestampTz(mz_ore::now::to_datetime(when).try_into().expect("must fit")),
565        ])
566    }
567
568    pub fn pack_full_statement_execution_update(
569        began_record: &StatementBeganExecutionRecord,
570        ended_record: &StatementEndedExecutionRecord,
571    ) -> Row {
572        let mut row = Row::default();
573        let mut packer = row.packer();
574        Self::pack_statement_execution_inner(began_record, &mut packer);
575        let (status, error_message, result_size, rows_returned, execution_strategy) =
576            match &ended_record.reason {
577                StatementEndedExecutionReason::Success {
578                    result_size,
579                    rows_returned,
580                    execution_strategy,
581                } => (
582                    "success",
583                    None,
584                    result_size.map(|rs| i64::try_from(rs).expect("must fit")),
585                    rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
586                    execution_strategy.map(|es| es.name()),
587                ),
588                StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
589                StatementEndedExecutionReason::Errored { error } => {
590                    ("error", Some(error.as_str()), None, None, None)
591                }
592                StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
593            };
594        packer.extend([
595            Datum::TimestampTz(
596                to_datetime(ended_record.ended_at)
597                    .try_into()
598                    .expect("Sane system time"),
599            ),
600            status.into(),
601            error_message.into(),
602            result_size.into(),
603            rows_returned.into(),
604            execution_strategy.into(),
605        ]);
606        row
607    }
608
609    pub fn pack_statement_ended_execution_updates(
610        began_record: &StatementBeganExecutionRecord,
611        ended_record: &StatementEndedExecutionRecord,
612    ) -> [(Row, Diff); 2] {
613        let retraction = Self::pack_statement_began_execution_update(began_record);
614        let new = Self::pack_full_statement_execution_update(began_record, ended_record);
615        [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
616    }
617
618    /// Mutate a statement execution record via the given function `f`.
619    fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
620        &mut self,
621        StatementLoggingId(id): StatementLoggingId,
622        f: F,
623    ) {
624        let record = self
625            .statement_logging
626            .executions_begun
627            .get_mut(&id)
628            .expect("mutate_record must not be called after execution ends");
629        let retraction = Self::pack_statement_began_execution_update(record);
630        self.statement_logging
631            .pending_statement_execution_events
632            .push((retraction, Diff::MINUS_ONE));
633        f(record);
634        let update = Self::pack_statement_began_execution_update(record);
635        self.statement_logging
636            .pending_statement_execution_events
637            .push((update, Diff::ONE));
638    }
639
640    /// Set the `cluster_id` for a statement, once it's known.
641    pub fn set_statement_execution_cluster(
642        &mut self,
643        id: StatementLoggingId,
644        cluster_id: ClusterId,
645    ) {
646        let cluster_name = self.catalog().get_cluster(cluster_id).name.clone();
647        self.mutate_record(id, |record| {
648            record.cluster_name = Some(cluster_name);
649            record.cluster_id = Some(cluster_id);
650        });
651    }
652
653    /// Set the `execution_timestamp` for a statement, once it's known
654    pub fn set_statement_execution_timestamp(
655        &mut self,
656        id: StatementLoggingId,
657        timestamp: Timestamp,
658    ) {
659        self.mutate_record(id, |record| {
660            record.execution_timestamp = Some(u64::from(timestamp));
661        });
662    }
663
664    pub fn set_transient_index_id(&mut self, id: StatementLoggingId, transient_index_id: GlobalId) {
665        self.mutate_record(id, |record| {
666            record.transient_index_id = Some(transient_index_id)
667        });
668    }
669
670    /// Possibly record the beginning of statement execution, depending on a randomly-chosen value.
671    /// If the execution beginning was indeed logged, returns a `StatementLoggingId` that must be
672    /// passed to `end_statement_execution` to record when it ends.
673    ///
674    /// `lifecycle_timestamps` has timestamps that come from the Adapter frontend (`mz-pgwire`) part
675    /// of the lifecycle.
676    pub fn begin_statement_execution(
677        &mut self,
678        session: &mut Session,
679        params: &Params,
680        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
681        lifecycle_timestamps: Option<LifecycleTimestamps>,
682    ) -> Option<StatementLoggingId> {
683        let enable_internal_statement_logging = self
684            .catalog()
685            .system_config()
686            .enable_internal_statement_logging();
687        if session.user().is_internal() && !enable_internal_statement_logging {
688            return None;
689        }
690        let sample_rate = self.statement_execution_sample_rate(session);
691
692        let distribution = Bernoulli::new(sample_rate).expect("rate must be in range [0, 1]");
693        let sample = if self
694            .catalog()
695            .system_config()
696            .statement_logging_use_reproducible_rng()
697        {
698            distribution.sample(&mut self.statement_logging.reproducible_rng)
699        } else {
700            distribution.sample(&mut thread_rng())
701        };
702
703        // Track how many statements we're recording.
704        let sampled_label = sample.then_some("true").unwrap_or("false");
705        self.metrics
706            .statement_logging_records
707            .with_label_values(&[sampled_label])
708            .inc_by(1);
709
710        if let Some((sql, accounted)) = match session.qcell_rw(logging) {
711            PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
712            PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
713                Some((sql, accounted))
714            }
715        } {
716            if !*accounted {
717                self.metrics
718                    .statement_logging_unsampled_bytes
719                    .with_label_values(&[])
720                    .inc_by(u64::cast_from(sql.len()));
721                if sample {
722                    self.metrics
723                        .statement_logging_actual_bytes
724                        .with_label_values(&[])
725                        .inc_by(u64::cast_from(sql.len()));
726                }
727                *accounted = true;
728            }
729        }
730        if !sample {
731            return None;
732        }
733        let (ps_record, ps_uuid) = self.log_prepared_statement(session, logging)?;
734
735        let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
736            lifecycle_timestamps.received
737        } else {
738            self.now()
739        };
740        let now = self.now();
741        let execution_uuid = epoch_to_uuid_v7(&now);
742        self.record_statement_lifecycle_event(
743            &StatementLoggingId(execution_uuid),
744            &StatementLifecycleEvent::ExecutionBegan,
745            began_at,
746        );
747
748        let params = std::iter::zip(params.execute_types.iter(), params.datums.iter())
749            .map(|(r#type, datum)| {
750                mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
751                    let mut buf = BytesMut::new();
752                    val.encode_text(&mut buf);
753                    String::from_utf8(Into::<Vec<u8>>::into(buf))
754                        .expect("Serialization shouldn't produce non-UTF-8 strings.")
755                })
756            })
757            .collect();
758        let record = StatementBeganExecutionRecord {
759            id: execution_uuid,
760            prepared_statement_id: ps_uuid,
761            sample_rate,
762            params,
763            began_at,
764            application_name: session.application_name().to_string(),
765            transaction_isolation: session.vars().transaction_isolation().to_string(),
766            transaction_id: session
767                .transaction()
768                .inner()
769                .expect("Every statement runs in an explicit or implicit transaction")
770                .id,
771            mz_version: self
772                .catalog()
773                .state()
774                .config()
775                .build_info
776                .human_version(None),
777            // These are not known yet; we'll fill them in later.
778            cluster_id: None,
779            cluster_name: None,
780            execution_timestamp: None,
781            transient_index_id: None,
782            database_name: session.vars().database().into(),
783            search_path: session
784                .vars()
785                .search_path()
786                .iter()
787                .map(|s| s.as_str().to_string())
788                .collect(),
789        };
790        let mseh_update = Self::pack_statement_began_execution_update(&record);
791        self.statement_logging
792            .pending_statement_execution_events
793            .push((mseh_update, Diff::ONE));
794        self.statement_logging
795            .executions_begun
796            .insert(execution_uuid, record);
797        if let Some((ps_record, ps_update)) = ps_record {
798            self.statement_logging
799                .pending_prepared_statement_events
800                .push(ps_update);
801            if let Some(sh) = self
802                .statement_logging
803                .unlogged_sessions
804                .remove(&ps_record.session_id)
805            {
806                let sh_update = Self::pack_session_history_update(&sh);
807                self.statement_logging
808                    .pending_session_events
809                    .push(sh_update);
810            }
811        }
812        Some(StatementLoggingId(execution_uuid))
813    }
814
815    /// Record a new connection event
816    pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
817        let id = session.uuid();
818        let session_role = session.authenticated_role_id();
819        let event = SessionHistoryEvent {
820            id,
821            connected_at: session.connected_at(),
822            application_name: session.application_name().to_owned(),
823            authenticated_user: self.catalog.get_role(session_role).name.clone(),
824        };
825        self.statement_logging.unlogged_sessions.insert(id, event);
826    }
827
828    pub fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
829        self.statement_logging.unlogged_sessions.remove(&uuid);
830    }
831
832    pub fn record_statement_lifecycle_event(
833        &mut self,
834        id: &StatementLoggingId,
835        event: &StatementLifecycleEvent,
836        when: EpochMillis,
837    ) {
838        if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
839            .get(self.catalog().system_config().dyncfgs())
840        {
841            let row = Self::pack_statement_lifecycle_event(id, event, when);
842            self.statement_logging
843                .pending_statement_lifecycle_events
844                .push(row);
845        }
846    }
847}
848
849mod sealed {
850    /// A struct that is purposefully private so folks are forced to use the constructor of an
851    /// enum.
852    #[derive(Debug, Copy, Clone)]
853    pub struct Private;
854}