mz_adapter/coord/
statement_logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use bytes::BytesMut;
14use mz_controller_types::ClusterId;
15use mz_ore::now::{NowFn, epoch_to_uuid_v7, to_datetime};
16use mz_ore::task::spawn;
17use mz_ore::{cast::CastFrom, cast::CastInto, now::EpochMillis};
18use mz_repr::adt::array::ArrayDimension;
19use mz_repr::adt::timestamp::TimestampLike;
20use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker, Timestamp};
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::ast::{AstInfo, Statement};
23use mz_sql::plan::Params;
24use mz_sql::session::metadata::SessionMetadata;
25use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
26use mz_storage_client::controller::IntrospectionType;
27use qcell::QCell;
28use rand::SeedableRng;
29use rand::{distributions::Bernoulli, prelude::Distribution, thread_rng};
30use sha2::{Digest, Sha256};
31use tokio::time::MissedTickBehavior;
32use tracing::debug;
33use uuid::Uuid;
34
35use crate::coord::{ConnMeta, Coordinator};
36use crate::session::Session;
37use crate::statement_logging::{
38    SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
39    StatementEndedExecutionRecord, StatementLifecycleEvent, StatementPreparedRecord,
40};
41
42use super::Message;
43
44/// Metadata required for logging a prepared statement.
45#[derive(Debug)]
46pub enum PreparedStatementLoggingInfo {
47    /// The statement has already been logged; we don't need to log it
48    /// again if a future execution hits the sampling rate; we merely
49    /// need to reference the corresponding UUID.
50    AlreadyLogged { uuid: Uuid },
51    /// The statement has not yet been logged; if a future execution
52    /// hits the sampling rate, we need to log it at that point.
53    StillToLog {
54        /// The SQL text of the statement.
55        sql: String,
56        /// The SQL text of the statement, redacted to follow our data management
57        /// policy
58        redacted_sql: String,
59        /// When the statement was prepared
60        prepared_at: EpochMillis,
61        /// The name with which the statement was prepared
62        name: String,
63        /// The ID of the session that prepared the statement
64        session_id: Uuid,
65        /// Whether we have already recorded this in the "would have logged" metric
66        accounted: bool,
67        /// The top-level kind of the statement (e.g., `Select`), or `None` for an empty statement
68        kind: Option<StatementKind>,
69
70        /// Private type that forces use of the [`PreparedStatementLoggingInfo::still_to_log`]
71        /// constructor.
72        _sealed: sealed::Private,
73    },
74}
75
76impl PreparedStatementLoggingInfo {
77    /// Constructor for the [`PreparedStatementLoggingInfo::StillToLog`] variant that ensures SQL
78    /// statements are properly redacted.
79    pub fn still_to_log<A: AstInfo>(
80        raw_sql: String,
81        stmt: Option<&Statement<A>>,
82        prepared_at: EpochMillis,
83        name: String,
84        session_id: Uuid,
85        accounted: bool,
86    ) -> Self {
87        let kind = stmt.map(StatementKind::from);
88        let sql = match kind {
89            // We __always__ want to redact SQL statements that might contain secret values.
90            Some(StatementKind::CreateSecret | StatementKind::AlterSecret) => {
91                stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default()
92            }
93            _ => raw_sql,
94        };
95
96        PreparedStatementLoggingInfo::StillToLog {
97            sql,
98            redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
99            prepared_at,
100            name,
101            session_id,
102            accounted,
103            kind,
104            _sealed: sealed::Private,
105        }
106    }
107}
108
109#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
110pub struct StatementLoggingId(Uuid);
111
112#[derive(Debug)]
113pub(crate) struct PreparedStatementEvent {
114    prepared_statement: Row,
115    sql_text: Row,
116}
117
118#[derive(Debug)]
119pub(crate) struct StatementLogging {
120    /// Information about statement executions that have been logged
121    /// but not finished.
122    ///
123    /// This map needs to have enough state left over to later retract
124    /// the system table entries (so that we can update them when the
125    /// execution finished.)
126    executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
127
128    /// Information about sessions that have been started, but which
129    /// have not yet been logged in `mz_session_history`.
130    /// They may be logged as part of a statement being executed (and chosen for logging).
131    unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
132
133    /// A reproducible RNG for deciding whether to sample statement executions.
134    /// Only used by tests; otherwise, `rand::thread_rng()` is used.
135    /// Controlled by the system var `statement_logging_use_reproducible_rng`.
136    reproducible_rng: rand_chacha::ChaCha8Rng,
137
138    pending_statement_execution_events: Vec<(Row, Diff)>,
139    pending_prepared_statement_events: Vec<PreparedStatementEvent>,
140    pending_session_events: Vec<Row>,
141    pending_statement_lifecycle_events: Vec<Row>,
142
143    now: NowFn,
144
145    /// The number of bytes that we are allowed to emit for statement logging without being throttled.
146    /// Increases at a rate of [`mz_sql::session::vars::STATEMENT_LOGGING_TARGET_DATA_RATE`] per second,
147    /// up to a max value of [`mz_sql::session::vars::STATEMENT_LOGGING_MAX_DATA_CREDIT`].
148    tokens: u64,
149    /// The last time at which a statement was logged.
150    last_logged_ts_seconds: u64,
151    /// The number of statements that have been throttled since the last successfully logged statement.
152    throttled_count: usize,
153}
154
155impl StatementLogging {
156    pub(crate) fn new(now: NowFn) -> Self {
157        let last_logged_ts_seconds = (now)() / 1000;
158        Self {
159            executions_begun: BTreeMap::new(),
160            unlogged_sessions: BTreeMap::new(),
161            reproducible_rng: rand_chacha::ChaCha8Rng::seed_from_u64(42),
162            pending_statement_execution_events: Vec::new(),
163            pending_prepared_statement_events: Vec::new(),
164            pending_session_events: Vec::new(),
165            pending_statement_lifecycle_events: Vec::new(),
166            tokens: 0,
167            last_logged_ts_seconds,
168            now: now.clone(),
169            throttled_count: 0,
170        }
171    }
172
173    /// Check if we need to drop a statement
174    /// due to throttling, and update internal data structures appropriately.
175    ///
176    /// Returns `None` if we must throttle this statement, and `Some(n)` otherwise, where `n`
177    /// is the number of statements that were dropped due to throttling before this one.
178    fn throttling_check(
179        &mut self,
180        cost: u64,
181        target_data_rate: u64,
182        max_data_credit: Option<u64>,
183    ) -> Option<usize> {
184        let ts = (self.now)() / 1000;
185        let elapsed = ts - self.last_logged_ts_seconds;
186        self.last_logged_ts_seconds = ts;
187        self.tokens = self
188            .tokens
189            .saturating_add(target_data_rate.saturating_mul(elapsed));
190        if let Some(max_data_credit) = max_data_credit {
191            self.tokens = self.tokens.min(max_data_credit);
192        }
193        if let Some(remaining) = self.tokens.checked_sub(cost) {
194            debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
195            self.tokens = remaining;
196            Some(std::mem::take(&mut self.throttled_count))
197        } else {
198            debug!(
199                "throttling check failed. tokens available: {}; cost: {cost}",
200                self.tokens
201            );
202            self.throttled_count += 1;
203            None
204        }
205    }
206}
207
208impl Coordinator {
209    pub(crate) fn spawn_statement_logging_task(&self) {
210        let internal_cmd_tx = self.internal_cmd_tx.clone();
211        spawn(|| "statement_logging", async move {
212            // TODO[btv] make this configurable via LD?
213            // Although... Logging every 5 seconds seems like it
214            // should have acceptable cost for now, since we do a
215            // group commit for tables every 1s anyway.
216            let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
217            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
218            loop {
219                interval.tick().await;
220                let _ = internal_cmd_tx.send(Message::DrainStatementLog);
221            }
222        });
223    }
224
225    #[mz_ore::instrument(level = "debug")]
226    pub(crate) fn drain_statement_log(&mut self) {
227        let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
228            .into_iter()
229            .map(|update| (update, Diff::ONE))
230            .collect();
231        let (prepared_statement_updates, sql_text_updates) =
232            std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
233                .into_iter()
234                .map(
235                    |PreparedStatementEvent {
236                         prepared_statement,
237                         sql_text,
238                     }| {
239                        ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
240                    },
241                )
242                .unzip::<_, _, Vec<_>, Vec<_>>();
243        let statement_execution_updates =
244            std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
245        let statement_lifecycle_updates =
246            std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
247                .into_iter()
248                .map(|update| (update, Diff::ONE))
249                .collect();
250
251        use IntrospectionType::*;
252        for (type_, updates) in [
253            (SessionHistory, session_updates),
254            (PreparedStatementHistory, prepared_statement_updates),
255            (StatementExecutionHistory, statement_execution_updates),
256            (StatementLifecycleHistory, statement_lifecycle_updates),
257            (SqlText, sql_text_updates),
258        ] {
259            if !updates.is_empty() && !self.controller.read_only() {
260                self.controller
261                    .storage
262                    .append_introspection_updates(type_, updates);
263            }
264        }
265    }
266
267    /// Check whether we need to do throttling (i.e., whether `STATEMENT_LOGGING_TARGET_DATA_RATE` is set).
268    /// If so, actually do the check.
269    ///
270    /// Returns `None` if we must throttle this statement, and `Some(n)` otherwise, where `n`
271    /// is the number of statements that were dropped due to throttling before this one.
272    fn statement_logging_throttling_check(&mut self, cost: usize) -> Option<usize> {
273        let Some(target_data_rate) = self
274            .catalog
275            .system_config()
276            .statement_logging_target_data_rate()
277        else {
278            return Some(std::mem::take(&mut self.statement_logging.throttled_count));
279        };
280        let max_data_credit = self
281            .catalog
282            .system_config()
283            .statement_logging_max_data_credit();
284        self.statement_logging.throttling_check(
285            cost.cast_into(),
286            target_data_rate.cast_into(),
287            max_data_credit.map(CastInto::cast_into),
288        )
289    }
290
291    /// Returns any statement logging events needed for a particular
292    /// prepared statement. Possibly mutates the `PreparedStatementLoggingInfo` metadata.
293    ///
294    /// This function does not do a sampling check, and assumes we did so in a higher layer.
295    ///
296    /// It _does_ do a throttling check, and returns `None` if we must not log due to throttling.
297    pub(crate) fn log_prepared_statement(
298        &mut self,
299        session: &mut Session,
300        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
301    ) -> Option<(
302        Option<(StatementPreparedRecord, PreparedStatementEvent)>,
303        Uuid,
304    )> {
305        let logging = session.qcell_rw(&*logging);
306        let mut out = None;
307
308        let uuid = match logging {
309            PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
310            PreparedStatementLoggingInfo::StillToLog {
311                sql,
312                redacted_sql,
313                prepared_at,
314                name,
315                session_id,
316                accounted,
317                kind,
318                _sealed: _,
319            } => {
320                assert!(
321                    *accounted,
322                    "accounting for logging should be done in `begin_statement_execution`"
323                );
324                let uuid = epoch_to_uuid_v7(prepared_at);
325                let sql = std::mem::take(sql);
326                let redacted_sql = std::mem::take(redacted_sql);
327                let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
328                let record = StatementPreparedRecord {
329                    id: uuid,
330                    sql_hash,
331                    name: std::mem::take(name),
332                    session_id: *session_id,
333                    prepared_at: *prepared_at,
334                    kind: *kind,
335                };
336                let mut mpsh_row = Row::default();
337                let mut mpsh_packer = mpsh_row.packer();
338                Self::pack_statement_prepared_update(&record, &mut mpsh_packer);
339                let sql_row = Row::pack([
340                    Datum::TimestampTz(
341                        to_datetime(*prepared_at)
342                            .truncate_day()
343                            .try_into()
344                            .expect("must fit"),
345                    ),
346                    Datum::Bytes(sql_hash.as_slice()),
347                    Datum::String(sql.as_str()),
348                    Datum::String(redacted_sql.as_str()),
349                ]);
350
351                let cost = mpsh_packer.byte_len() + sql_row.byte_len();
352                let throttled_count = self.statement_logging_throttling_check(cost)?;
353                mpsh_packer.push(Datum::UInt64(throttled_count.try_into().expect("must fit")));
354                out = Some((
355                    record,
356                    PreparedStatementEvent {
357                        prepared_statement: mpsh_row,
358                        sql_text: sql_row,
359                    },
360                ));
361
362                *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
363                uuid
364            }
365        };
366        Some((out, uuid))
367    }
368    /// The rate at which statement execution should be sampled.
369    /// This is the value of the session var `statement_logging_sample_rate`,
370    /// constrained by the system var `statement_logging_max_sample_rate`.
371    pub fn statement_execution_sample_rate(&self, session: &Session) -> f64 {
372        let system: f64 = self
373            .catalog()
374            .system_config()
375            .statement_logging_max_sample_rate()
376            .try_into()
377            .expect("value constrained to be convertible to f64");
378        let user: f64 = session
379            .vars()
380            .get_statement_logging_sample_rate()
381            .try_into()
382            .expect("value constrained to be convertible to f64");
383        f64::min(system, user)
384    }
385
386    /// Record the end of statement execution for a statement whose beginning was logged.
387    /// It is an error to call this function for a statement whose beginning was not logged
388    /// (because it was not sampled). Requiring the opaque `StatementLoggingId` type,
389    /// which is only instantiated by `begin_statement_execution` if the statement is actually logged,
390    /// should prevent this.
391    pub fn end_statement_execution(
392        &mut self,
393        id: StatementLoggingId,
394        reason: StatementEndedExecutionReason,
395    ) {
396        let StatementLoggingId(uuid) = id;
397        let now = self.now();
398        let ended_record = StatementEndedExecutionRecord {
399            id: uuid,
400            reason,
401            ended_at: now,
402        };
403
404        let began_record = self
405            .statement_logging
406            .executions_begun
407            .remove(&uuid)
408            .expect(
409                "matched `begin_statement_execution` and `end_statement_execution` invocations",
410            );
411        for (row, diff) in
412            Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
413        {
414            self.statement_logging
415                .pending_statement_execution_events
416                .push((row, diff));
417        }
418        self.record_statement_lifecycle_event(
419            &id,
420            &StatementLifecycleEvent::ExecutionFinished,
421            now,
422        );
423    }
424
425    fn pack_statement_execution_inner(
426        record: &StatementBeganExecutionRecord,
427        packer: &mut RowPacker,
428    ) {
429        let StatementBeganExecutionRecord {
430            id,
431            prepared_statement_id,
432            sample_rate,
433            params,
434            began_at,
435            cluster_id,
436            cluster_name,
437            database_name,
438            search_path,
439            application_name,
440            transaction_isolation,
441            execution_timestamp,
442            transaction_id,
443            transient_index_id,
444            mz_version,
445        } = record;
446
447        let cluster = cluster_id.map(|id| id.to_string());
448        let transient_index_id = transient_index_id.map(|id| id.to_string());
449        packer.extend([
450            Datum::Uuid(*id),
451            Datum::Uuid(*prepared_statement_id),
452            Datum::Float64((*sample_rate).into()),
453            match &cluster {
454                None => Datum::Null,
455                Some(cluster_id) => Datum::String(cluster_id),
456            },
457            Datum::String(&*application_name),
458            cluster_name.as_ref().map(String::as_str).into(),
459            Datum::String(database_name),
460        ]);
461        packer.push_list(search_path.iter().map(|s| Datum::String(s)));
462        packer.extend([
463            Datum::String(&*transaction_isolation),
464            (*execution_timestamp).into(),
465            Datum::UInt64(*transaction_id),
466            match &transient_index_id {
467                None => Datum::Null,
468                Some(transient_index_id) => Datum::String(transient_index_id),
469            },
470        ]);
471        packer
472            .try_push_array(
473                &[ArrayDimension {
474                    lower_bound: 1,
475                    length: params.len(),
476                }],
477                params
478                    .iter()
479                    .map(|p| Datum::from(p.as_ref().map(String::as_str))),
480            )
481            .expect("correct array dimensions");
482        packer.push(Datum::from(mz_version.as_str()));
483        packer.push(Datum::TimestampTz(
484            to_datetime(*began_at).try_into().expect("Sane system time"),
485        ));
486    }
487
488    fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
489        let mut row = Row::default();
490        let mut packer = row.packer();
491        Self::pack_statement_execution_inner(record, &mut packer);
492        packer.extend([
493            // finished_at
494            Datum::Null,
495            // finished_status
496            Datum::Null,
497            // error_message
498            Datum::Null,
499            // result_size
500            Datum::Null,
501            // rows_returned
502            Datum::Null,
503            // execution_status
504            Datum::Null,
505        ]);
506        row
507    }
508
509    fn pack_statement_prepared_update(record: &StatementPreparedRecord, packer: &mut RowPacker) {
510        let StatementPreparedRecord {
511            id,
512            session_id,
513            name,
514            sql_hash,
515            prepared_at,
516            kind,
517        } = record;
518        packer.extend([
519            Datum::Uuid(*id),
520            Datum::Uuid(*session_id),
521            Datum::String(name.as_str()),
522            Datum::Bytes(sql_hash.as_slice()),
523            Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
524            kind.map(statement_kind_label_value).into(),
525        ]);
526    }
527
528    fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
529        let SessionHistoryEvent {
530            id,
531            connected_at,
532            application_name,
533            authenticated_user,
534        } = event;
535        Row::pack_slice(&[
536            Datum::Uuid(*id),
537            Datum::TimestampTz(
538                mz_ore::now::to_datetime(*connected_at)
539                    .try_into()
540                    .expect("must fit"),
541            ),
542            Datum::String(&*application_name),
543            Datum::String(&*authenticated_user),
544        ])
545    }
546
547    fn pack_statement_lifecycle_event(
548        StatementLoggingId(uuid): &StatementLoggingId,
549        event: &StatementLifecycleEvent,
550        when: EpochMillis,
551    ) -> Row {
552        Row::pack_slice(&[
553            Datum::Uuid(*uuid),
554            Datum::String(event.as_str()),
555            Datum::TimestampTz(mz_ore::now::to_datetime(when).try_into().expect("must fit")),
556        ])
557    }
558
559    pub fn pack_full_statement_execution_update(
560        began_record: &StatementBeganExecutionRecord,
561        ended_record: &StatementEndedExecutionRecord,
562    ) -> Row {
563        let mut row = Row::default();
564        let mut packer = row.packer();
565        Self::pack_statement_execution_inner(began_record, &mut packer);
566        let (status, error_message, result_size, rows_returned, execution_strategy) =
567            match &ended_record.reason {
568                StatementEndedExecutionReason::Success {
569                    result_size,
570                    rows_returned,
571                    execution_strategy,
572                } => (
573                    "success",
574                    None,
575                    result_size.map(|rs| i64::try_from(rs).expect("must fit")),
576                    rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
577                    execution_strategy.map(|es| es.name()),
578                ),
579                StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
580                StatementEndedExecutionReason::Errored { error } => {
581                    ("error", Some(error.as_str()), None, None, None)
582                }
583                StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
584            };
585        packer.extend([
586            Datum::TimestampTz(
587                to_datetime(ended_record.ended_at)
588                    .try_into()
589                    .expect("Sane system time"),
590            ),
591            status.into(),
592            error_message.into(),
593            result_size.into(),
594            rows_returned.into(),
595            execution_strategy.into(),
596        ]);
597        row
598    }
599
600    pub fn pack_statement_ended_execution_updates(
601        began_record: &StatementBeganExecutionRecord,
602        ended_record: &StatementEndedExecutionRecord,
603    ) -> [(Row, Diff); 2] {
604        let retraction = Self::pack_statement_began_execution_update(began_record);
605        let new = Self::pack_full_statement_execution_update(began_record, ended_record);
606        [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
607    }
608
609    /// Mutate a statement execution record via the given function `f`.
610    fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
611        &mut self,
612        StatementLoggingId(id): StatementLoggingId,
613        f: F,
614    ) {
615        let record = self
616            .statement_logging
617            .executions_begun
618            .get_mut(&id)
619            .expect("mutate_record must not be called after execution ends");
620        let retraction = Self::pack_statement_began_execution_update(record);
621        self.statement_logging
622            .pending_statement_execution_events
623            .push((retraction, Diff::MINUS_ONE));
624        f(record);
625        let update = Self::pack_statement_began_execution_update(record);
626        self.statement_logging
627            .pending_statement_execution_events
628            .push((update, Diff::ONE));
629    }
630
631    /// Set the `cluster_id` for a statement, once it's known.
632    pub fn set_statement_execution_cluster(
633        &mut self,
634        id: StatementLoggingId,
635        cluster_id: ClusterId,
636    ) {
637        let cluster_name = self.catalog().get_cluster(cluster_id).name.clone();
638        self.mutate_record(id, |record| {
639            record.cluster_name = Some(cluster_name);
640            record.cluster_id = Some(cluster_id);
641        });
642    }
643
644    /// Set the `execution_timestamp` for a statement, once it's known
645    pub fn set_statement_execution_timestamp(
646        &mut self,
647        id: StatementLoggingId,
648        timestamp: Timestamp,
649    ) {
650        self.mutate_record(id, |record| {
651            record.execution_timestamp = Some(u64::from(timestamp));
652        });
653    }
654
655    pub fn set_transient_index_id(&mut self, id: StatementLoggingId, transient_index_id: GlobalId) {
656        self.mutate_record(id, |record| {
657            record.transient_index_id = Some(transient_index_id)
658        });
659    }
660
661    /// Possibly record the beginning of statement execution, depending on a randomly-chosen value.
662    /// If the execution beginning was indeed logged, returns a `StatementLoggingId` that must be
663    /// passed to `end_statement_execution` to record when it ends.
664    pub fn begin_statement_execution(
665        &mut self,
666        session: &mut Session,
667        params: &Params,
668        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
669    ) -> Option<StatementLoggingId> {
670        let enable_internal_statement_logging = self
671            .catalog()
672            .system_config()
673            .enable_internal_statement_logging();
674        if session.user().is_internal() && !enable_internal_statement_logging {
675            return None;
676        }
677        let sample_rate = self.statement_execution_sample_rate(session);
678
679        let distribution = Bernoulli::new(sample_rate).expect("rate must be in range [0, 1]");
680        let sample = if self
681            .catalog()
682            .system_config()
683            .statement_logging_use_reproducible_rng()
684        {
685            distribution.sample(&mut self.statement_logging.reproducible_rng)
686        } else {
687            distribution.sample(&mut thread_rng())
688        };
689
690        // Track how many statements we're recording.
691        let sampled_label = sample.then_some("true").unwrap_or("false");
692        self.metrics
693            .statement_logging_records
694            .with_label_values(&[sampled_label])
695            .inc_by(1);
696
697        if let Some((sql, accounted)) = match session.qcell_rw(logging) {
698            PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
699            PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
700                Some((sql, accounted))
701            }
702        } {
703            if !*accounted {
704                self.metrics
705                    .statement_logging_unsampled_bytes
706                    .with_label_values(&[])
707                    .inc_by(u64::cast_from(sql.len()));
708                if sample {
709                    self.metrics
710                        .statement_logging_actual_bytes
711                        .with_label_values(&[])
712                        .inc_by(u64::cast_from(sql.len()));
713                }
714                *accounted = true;
715            }
716        }
717        if !sample {
718            return None;
719        }
720        let (ps_record, ps_uuid) = self.log_prepared_statement(session, logging)?;
721
722        let uuid = match session.qcell_rw(logging) {
723            PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
724            PreparedStatementLoggingInfo::StillToLog { prepared_at, .. } => {
725                epoch_to_uuid_v7(prepared_at)
726            }
727        };
728
729        let now = self.now();
730        self.record_statement_lifecycle_event(
731            &StatementLoggingId(uuid),
732            &StatementLifecycleEvent::ExecutionBegan,
733            now,
734        );
735
736        let params = std::iter::zip(params.types.iter(), params.datums.iter())
737            .map(|(r#type, datum)| {
738                mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
739                    let mut buf = BytesMut::new();
740                    val.encode_text(&mut buf);
741                    String::from_utf8(Into::<Vec<u8>>::into(buf))
742                        .expect("Serialization shouldn't produce non-UTF-8 strings.")
743                })
744            })
745            .collect();
746        let record = StatementBeganExecutionRecord {
747            id: uuid,
748            prepared_statement_id: ps_uuid,
749            sample_rate,
750            params,
751            began_at: self.now(),
752            application_name: session.application_name().to_string(),
753            transaction_isolation: session.vars().transaction_isolation().to_string(),
754            transaction_id: session
755                .transaction()
756                .inner()
757                .expect("Every statement runs in an explicit or implicit transaction")
758                .id,
759            mz_version: self
760                .catalog()
761                .state()
762                .config()
763                .build_info
764                .human_version(None),
765            // These are not known yet; we'll fill them in later.
766            cluster_id: None,
767            cluster_name: None,
768            execution_timestamp: None,
769            transient_index_id: None,
770            database_name: session.vars().database().into(),
771            search_path: session
772                .vars()
773                .search_path()
774                .iter()
775                .map(|s| s.as_str().to_string())
776                .collect(),
777        };
778        let mseh_update = Self::pack_statement_began_execution_update(&record);
779        self.statement_logging
780            .pending_statement_execution_events
781            .push((mseh_update, Diff::ONE));
782        self.statement_logging.executions_begun.insert(uuid, record);
783        if let Some((ps_record, ps_update)) = ps_record {
784            self.statement_logging
785                .pending_prepared_statement_events
786                .push(ps_update);
787            if let Some(sh) = self
788                .statement_logging
789                .unlogged_sessions
790                .remove(&ps_record.session_id)
791            {
792                let sh_update = Self::pack_session_history_update(&sh);
793                self.statement_logging
794                    .pending_session_events
795                    .push(sh_update);
796            }
797        }
798        Some(StatementLoggingId(uuid))
799    }
800
801    /// Record a new connection event
802    pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
803        let id = session.uuid();
804        let session_role = session.authenticated_role_id();
805        let event = SessionHistoryEvent {
806            id,
807            connected_at: session.connected_at(),
808            application_name: session.application_name().to_owned(),
809            authenticated_user: self.catalog.get_role(session_role).name.clone(),
810        };
811        self.statement_logging.unlogged_sessions.insert(id, event);
812    }
813
814    pub fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
815        self.statement_logging.unlogged_sessions.remove(&uuid);
816    }
817
818    pub fn record_statement_lifecycle_event(
819        &mut self,
820        id: &StatementLoggingId,
821        event: &StatementLifecycleEvent,
822        when: EpochMillis,
823    ) {
824        if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
825            .get(self.catalog().system_config().dyncfgs())
826        {
827            let row = Self::pack_statement_lifecycle_event(id, event, when);
828            self.statement_logging
829                .pending_statement_lifecycle_events
830                .push(row);
831        }
832    }
833}
834
835mod sealed {
836    /// A struct that is purposefully private so folks are forced to use the constructor of an
837    /// enum.
838    #[derive(Debug, Copy, Clone)]
839    pub struct Private;
840}