mz_adapter/coord/
statement_logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::sync::Arc;
12
13use bytes::BytesMut;
14use mz_controller_types::ClusterId;
15use mz_ore::now::{NowFn, epoch_to_uuid_v7, to_datetime};
16use mz_ore::task::spawn;
17use mz_ore::{cast::CastFrom, cast::CastInto, now::EpochMillis};
18use mz_repr::adt::array::ArrayDimension;
19use mz_repr::adt::timestamp::TimestampLike;
20use mz_repr::{Datum, Diff, GlobalId, Row, RowPacker, Timestamp};
21use mz_sql::ast::display::AstDisplay;
22use mz_sql::ast::{AstInfo, Statement};
23use mz_sql::plan::Params;
24use mz_sql::session::metadata::SessionMetadata;
25use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
26use mz_storage_client::controller::IntrospectionType;
27use qcell::QCell;
28use rand::SeedableRng;
29use rand::distr::{Bernoulli, Distribution};
30use sha2::{Digest, Sha256};
31use tokio::time::MissedTickBehavior;
32use tracing::debug;
33use uuid::Uuid;
34
35use crate::coord::{ConnMeta, Coordinator};
36use crate::session::{LifecycleTimestamps, Session};
37use crate::statement_logging::{
38    SessionHistoryEvent, StatementBeganExecutionRecord, StatementEndedExecutionReason,
39    StatementEndedExecutionRecord, StatementLifecycleEvent, StatementPreparedRecord,
40};
41
42use super::Message;
43
44/// Metadata required for logging a prepared statement.
45#[derive(Debug)]
46pub enum PreparedStatementLoggingInfo {
47    /// The statement has already been logged; we don't need to log it
48    /// again if a future execution hits the sampling rate; we merely
49    /// need to reference the corresponding UUID.
50    AlreadyLogged { uuid: Uuid },
51    /// The statement has not yet been logged; if a future execution
52    /// hits the sampling rate, we need to log it at that point.
53    StillToLog {
54        /// The SQL text of the statement.
55        sql: String,
56        /// The SQL text of the statement, redacted to follow our data management
57        /// policy
58        redacted_sql: String,
59        /// When the statement was prepared
60        prepared_at: EpochMillis,
61        /// The name with which the statement was prepared
62        name: String,
63        /// The ID of the session that prepared the statement
64        session_id: Uuid,
65        /// Whether we have already recorded this in the "would have logged" metric
66        accounted: bool,
67        /// The top-level kind of the statement (e.g., `Select`), or `None` for an empty statement
68        kind: Option<StatementKind>,
69
70        /// Private type that forces use of the [`PreparedStatementLoggingInfo::still_to_log`]
71        /// constructor.
72        _sealed: sealed::Private,
73    },
74}
75
76impl PreparedStatementLoggingInfo {
77    /// Constructor for the [`PreparedStatementLoggingInfo::StillToLog`] variant that ensures SQL
78    /// statements are properly redacted.
79    pub fn still_to_log<A: AstInfo>(
80        raw_sql: String,
81        stmt: Option<&Statement<A>>,
82        prepared_at: EpochMillis,
83        name: String,
84        session_id: Uuid,
85        accounted: bool,
86    ) -> Self {
87        let kind = stmt.map(StatementKind::from);
88        let sql = match kind {
89            // Always redact SQL statements that may contain sensitive information.
90            // CREATE SECRET and ALTER SECRET statements can contain secret values, so we redact them.
91            // INSERT, UPDATE, and EXECUTE statements can include large amounts of user data, so we redact them for both
92            // data privacy and to avoid logging excessive data.
93            Some(
94                StatementKind::CreateSecret
95                | StatementKind::AlterSecret
96                | StatementKind::Insert
97                | StatementKind::Update
98                | StatementKind::Execute,
99            ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
100            _ => raw_sql,
101        };
102
103        PreparedStatementLoggingInfo::StillToLog {
104            sql,
105            redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
106            prepared_at,
107            name,
108            session_id,
109            accounted,
110            kind,
111            _sealed: sealed::Private,
112        }
113    }
114}
115
116#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
117pub struct StatementLoggingId(Uuid);
118
119#[derive(Debug)]
120pub(crate) struct PreparedStatementEvent {
121    prepared_statement: Row,
122    sql_text: Row,
123}
124
125#[derive(Debug)]
126pub(crate) struct StatementLogging {
127    /// Information about statement executions that have been logged
128    /// but not finished.
129    ///
130    /// This map needs to have enough state left over to later retract
131    /// the system table entries (so that we can update them when the
132    /// execution finished.)
133    executions_begun: BTreeMap<Uuid, StatementBeganExecutionRecord>,
134
135    /// Information about sessions that have been started, but which
136    /// have not yet been logged in `mz_session_history`.
137    /// They may be logged as part of a statement being executed (and chosen for logging).
138    unlogged_sessions: BTreeMap<Uuid, SessionHistoryEvent>,
139
140    /// A reproducible RNG for deciding whether to sample statement executions.
141    /// Only used by tests; otherwise, `rand::rng()` is used.
142    /// Controlled by the system var `statement_logging_use_reproducible_rng`.
143    reproducible_rng: rand_chacha::ChaCha8Rng,
144
145    pending_statement_execution_events: Vec<(Row, Diff)>,
146    pending_prepared_statement_events: Vec<PreparedStatementEvent>,
147    pending_session_events: Vec<Row>,
148    pending_statement_lifecycle_events: Vec<Row>,
149
150    now: NowFn,
151
152    /// The number of bytes that we are allowed to emit for statement logging without being throttled.
153    /// Increases at a rate of [`mz_sql::session::vars::STATEMENT_LOGGING_TARGET_DATA_RATE`] per second,
154    /// up to a max value of [`mz_sql::session::vars::STATEMENT_LOGGING_MAX_DATA_CREDIT`].
155    tokens: u64,
156    /// The last time at which a statement was logged.
157    last_logged_ts_seconds: u64,
158    /// The number of statements that have been throttled since the last successfully logged statement.
159    throttled_count: usize,
160}
161
162impl StatementLogging {
163    pub(crate) fn new(now: NowFn) -> Self {
164        let last_logged_ts_seconds = (now)() / 1000;
165        Self {
166            executions_begun: BTreeMap::new(),
167            unlogged_sessions: BTreeMap::new(),
168            reproducible_rng: rand_chacha::ChaCha8Rng::seed_from_u64(42),
169            pending_statement_execution_events: Vec::new(),
170            pending_prepared_statement_events: Vec::new(),
171            pending_session_events: Vec::new(),
172            pending_statement_lifecycle_events: Vec::new(),
173            tokens: 0,
174            last_logged_ts_seconds,
175            now: now.clone(),
176            throttled_count: 0,
177        }
178    }
179
180    /// Check if we need to drop a statement
181    /// due to throttling, and update the number of available tokens appropriately.
182    ///
183    /// Returns `false` if we must throttle this statement, and `true` otherwise.
184    fn throttling_check(
185        &mut self,
186        cost: u64,
187        target_data_rate: u64,
188        max_data_credit: Option<u64>,
189    ) -> bool {
190        let ts = (self.now)() / 1000;
191        // We use saturating_sub here because system time isn't monotonic, causing cases
192        // when last_logged_ts_seconds is greater than ts.
193        let elapsed = ts.saturating_sub(self.last_logged_ts_seconds);
194        self.last_logged_ts_seconds = ts;
195        self.tokens = self
196            .tokens
197            .saturating_add(target_data_rate.saturating_mul(elapsed));
198        if let Some(max_data_credit) = max_data_credit {
199            self.tokens = self.tokens.min(max_data_credit);
200        }
201        if let Some(remaining) = self.tokens.checked_sub(cost) {
202            debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
203            self.tokens = remaining;
204            true
205        } else {
206            debug!(
207                "throttling check failed. tokens available: {}; cost: {cost}",
208                self.tokens
209            );
210            false
211        }
212    }
213}
214
215impl Coordinator {
216    pub(crate) fn spawn_statement_logging_task(&self) {
217        let internal_cmd_tx = self.internal_cmd_tx.clone();
218        spawn(|| "statement_logging", async move {
219            // TODO[btv] make this configurable via LD?
220            // Although... Logging every 5 seconds seems like it
221            // should have acceptable cost for now, since we do a
222            // group commit for tables every 1s anyway.
223            let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
224            interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
225            loop {
226                interval.tick().await;
227                let _ = internal_cmd_tx.send(Message::DrainStatementLog);
228            }
229        });
230    }
231
232    #[mz_ore::instrument(level = "debug")]
233    pub(crate) fn drain_statement_log(&mut self) {
234        let session_updates = std::mem::take(&mut self.statement_logging.pending_session_events)
235            .into_iter()
236            .map(|update| (update, Diff::ONE))
237            .collect();
238        let (prepared_statement_updates, sql_text_updates) =
239            std::mem::take(&mut self.statement_logging.pending_prepared_statement_events)
240                .into_iter()
241                .map(
242                    |PreparedStatementEvent {
243                         prepared_statement,
244                         sql_text,
245                     }| {
246                        ((prepared_statement, Diff::ONE), (sql_text, Diff::ONE))
247                    },
248                )
249                .unzip::<_, _, Vec<_>, Vec<_>>();
250        let statement_execution_updates =
251            std::mem::take(&mut self.statement_logging.pending_statement_execution_events);
252        let statement_lifecycle_updates =
253            std::mem::take(&mut self.statement_logging.pending_statement_lifecycle_events)
254                .into_iter()
255                .map(|update| (update, Diff::ONE))
256                .collect();
257
258        use IntrospectionType::*;
259        for (type_, updates) in [
260            (SessionHistory, session_updates),
261            (PreparedStatementHistory, prepared_statement_updates),
262            (StatementExecutionHistory, statement_execution_updates),
263            (StatementLifecycleHistory, statement_lifecycle_updates),
264            (SqlText, sql_text_updates),
265        ] {
266            if !updates.is_empty() && !self.controller.read_only() {
267                self.controller
268                    .storage
269                    .append_introspection_updates(type_, updates);
270            }
271        }
272    }
273
274    /// Check whether we need to do throttling (i.e., whether `STATEMENT_LOGGING_TARGET_DATA_RATE` is set).
275    /// If so, actually do the check.
276    ///
277    /// We expect `rows` to be the list of rows we intend to record and calculate the cost by summing the
278    /// byte lengths of the rows.
279    ///
280    /// Returns `false` if we must throttle this statement, and `true` otherwise.
281    fn statement_logging_throttling_check<'a, I>(&mut self, rows: I) -> bool
282    where
283        I: IntoIterator<Item = Option<&'a Row>>,
284    {
285        let cost = rows
286            .into_iter()
287            .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
288            .fold(0_usize, |acc, x| acc.saturating_add(x));
289
290        let Some(target_data_rate) = self
291            .catalog
292            .system_config()
293            .statement_logging_target_data_rate()
294        else {
295            return true;
296        };
297        let max_data_credit = self
298            .catalog
299            .system_config()
300            .statement_logging_max_data_credit();
301        self.statement_logging.throttling_check(
302            cost.cast_into(),
303            target_data_rate.cast_into(),
304            max_data_credit.map(CastInto::cast_into),
305        )
306    }
307
308    /// Marks a prepared statement as "already logged".
309    /// Mutates the `PreparedStatementLoggingInfo` metadata.
310    fn record_prepared_statement_as_logged(
311        &self,
312        uuid: Uuid,
313        session: &mut Session,
314        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
315    ) {
316        let logging = session.qcell_rw(&*logging);
317        if let PreparedStatementLoggingInfo::StillToLog { .. } = logging {
318            *logging = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
319        }
320    }
321
322    /// Returns any statement logging events needed for a particular
323    /// prepared statement. Possibly mutates the `PreparedStatementLoggingInfo` metadata.
324    ///
325    /// This function does not do a sampling check, and assumes we did so in a higher layer.
326    ///
327    ///
328    /// Returns A tuple containing:
329    /// - `Option<(StatementPreparedRecord, PreparedStatementEvent)>`: If the prepared statement
330    ///   has not yet been logged, returns the prepared statement record, the packed row of the
331    ///   prepared statement record, and a row for the SQL text.
332    /// - `Uuid`: The UUID of the prepared statement if the prepared statement has been logged
333    pub(crate) fn get_prepared_statement_info(
334        &self,
335        session: &Session,
336        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
337    ) -> (
338        Option<(StatementPreparedRecord, PreparedStatementEvent)>,
339        Uuid,
340    ) {
341        let logging = session.qcell_ro(&*logging);
342
343        match logging {
344            PreparedStatementLoggingInfo::AlreadyLogged { uuid } => (None, *uuid),
345            PreparedStatementLoggingInfo::StillToLog {
346                sql,
347                redacted_sql,
348                prepared_at,
349                name,
350                session_id,
351                accounted,
352                kind,
353                _sealed: _,
354            } => {
355                assert!(
356                    *accounted,
357                    "accounting for logging should be done in `begin_statement_execution`"
358                );
359                let uuid = epoch_to_uuid_v7(prepared_at);
360                let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
361                let record = StatementPreparedRecord {
362                    id: uuid,
363                    sql_hash,
364                    name: name.to_string(),
365                    session_id: *session_id,
366                    prepared_at: *prepared_at,
367                    kind: *kind,
368                };
369                let mut mpsh_row = Row::default();
370                let mut mpsh_packer = mpsh_row.packer();
371                Self::pack_statement_prepared_update(&record, &mut mpsh_packer);
372                let sql_row = Row::pack([
373                    Datum::TimestampTz(
374                        to_datetime(*prepared_at)
375                            .truncate_day()
376                            .try_into()
377                            .expect("must fit"),
378                    ),
379                    Datum::Bytes(sql_hash.as_slice()),
380                    Datum::String(sql.as_str()),
381                    Datum::String(redacted_sql.as_str()),
382                ]);
383
384                let throttled_count = self.statement_logging.throttled_count;
385                mpsh_packer.push(Datum::UInt64(throttled_count.try_into().expect("must fit")));
386
387                (
388                    Some((
389                        record,
390                        PreparedStatementEvent {
391                            prepared_statement: mpsh_row,
392                            sql_text: sql_row,
393                        },
394                    )),
395                    uuid,
396                )
397            }
398        }
399    }
400    /// The rate at which statement execution should be sampled.
401    /// This is the value of the session var `statement_logging_sample_rate`,
402    /// constrained by the system var `statement_logging_max_sample_rate`.
403    pub fn statement_execution_sample_rate(&self, session: &Session) -> f64 {
404        let system: f64 = self
405            .catalog()
406            .system_config()
407            .statement_logging_max_sample_rate()
408            .try_into()
409            .expect("value constrained to be convertible to f64");
410        let user: f64 = session
411            .vars()
412            .get_statement_logging_sample_rate()
413            .try_into()
414            .expect("value constrained to be convertible to f64");
415        f64::min(system, user)
416    }
417
418    /// Record the end of statement execution for a statement whose beginning was logged.
419    /// It is an error to call this function for a statement whose beginning was not logged
420    /// (because it was not sampled). Requiring the opaque `StatementLoggingId` type,
421    /// which is only instantiated by `begin_statement_execution` if the statement is actually logged,
422    /// should prevent this.
423    pub fn end_statement_execution(
424        &mut self,
425        id: StatementLoggingId,
426        reason: StatementEndedExecutionReason,
427    ) {
428        let StatementLoggingId(uuid) = id;
429        let now = self.now();
430        let ended_record = StatementEndedExecutionRecord {
431            id: uuid,
432            reason,
433            ended_at: now,
434        };
435
436        let began_record = self
437            .statement_logging
438            .executions_begun
439            .remove(&uuid)
440            .expect(
441                "matched `begin_statement_execution` and `end_statement_execution` invocations",
442            );
443        for (row, diff) in
444            Self::pack_statement_ended_execution_updates(&began_record, &ended_record)
445        {
446            self.statement_logging
447                .pending_statement_execution_events
448                .push((row, diff));
449        }
450        self.record_statement_lifecycle_event(
451            &id,
452            &StatementLifecycleEvent::ExecutionFinished,
453            now,
454        );
455    }
456
457    fn pack_statement_execution_inner(
458        record: &StatementBeganExecutionRecord,
459        packer: &mut RowPacker,
460    ) {
461        let StatementBeganExecutionRecord {
462            id,
463            prepared_statement_id,
464            sample_rate,
465            params,
466            began_at,
467            cluster_id,
468            cluster_name,
469            database_name,
470            search_path,
471            application_name,
472            transaction_isolation,
473            execution_timestamp,
474            transaction_id,
475            transient_index_id,
476            mz_version,
477        } = record;
478
479        let cluster = cluster_id.map(|id| id.to_string());
480        let transient_index_id = transient_index_id.map(|id| id.to_string());
481        packer.extend([
482            Datum::Uuid(*id),
483            Datum::Uuid(*prepared_statement_id),
484            Datum::Float64((*sample_rate).into()),
485            match &cluster {
486                None => Datum::Null,
487                Some(cluster_id) => Datum::String(cluster_id),
488            },
489            Datum::String(&*application_name),
490            cluster_name.as_ref().map(String::as_str).into(),
491            Datum::String(database_name),
492        ]);
493        packer.push_list(search_path.iter().map(|s| Datum::String(s)));
494        packer.extend([
495            Datum::String(&*transaction_isolation),
496            (*execution_timestamp).into(),
497            Datum::UInt64(*transaction_id),
498            match &transient_index_id {
499                None => Datum::Null,
500                Some(transient_index_id) => Datum::String(transient_index_id),
501            },
502        ]);
503        packer
504            .try_push_array(
505                &[ArrayDimension {
506                    lower_bound: 1,
507                    length: params.len(),
508                }],
509                params
510                    .iter()
511                    .map(|p| Datum::from(p.as_ref().map(String::as_str))),
512            )
513            .expect("correct array dimensions");
514        packer.push(Datum::from(mz_version.as_str()));
515        packer.push(Datum::TimestampTz(
516            to_datetime(*began_at).try_into().expect("Sane system time"),
517        ));
518    }
519
520    fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
521        let mut row = Row::default();
522        let mut packer = row.packer();
523        Self::pack_statement_execution_inner(record, &mut packer);
524        packer.extend([
525            // finished_at
526            Datum::Null,
527            // finished_status
528            Datum::Null,
529            // error_message
530            Datum::Null,
531            // result_size
532            Datum::Null,
533            // rows_returned
534            Datum::Null,
535            // execution_status
536            Datum::Null,
537        ]);
538        row
539    }
540
541    fn pack_statement_prepared_update(record: &StatementPreparedRecord, packer: &mut RowPacker) {
542        let StatementPreparedRecord {
543            id,
544            session_id,
545            name,
546            sql_hash,
547            prepared_at,
548            kind,
549        } = record;
550        packer.extend([
551            Datum::Uuid(*id),
552            Datum::Uuid(*session_id),
553            Datum::String(name.as_str()),
554            Datum::Bytes(sql_hash.as_slice()),
555            Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
556            kind.map(statement_kind_label_value).into(),
557        ]);
558    }
559
560    fn pack_session_history_update(event: &SessionHistoryEvent) -> Row {
561        let SessionHistoryEvent {
562            id,
563            connected_at,
564            application_name,
565            authenticated_user,
566        } = event;
567        Row::pack_slice(&[
568            Datum::Uuid(*id),
569            Datum::TimestampTz(
570                mz_ore::now::to_datetime(*connected_at)
571                    .try_into()
572                    .expect("must fit"),
573            ),
574            Datum::String(&*application_name),
575            Datum::String(&*authenticated_user),
576        ])
577    }
578
579    fn pack_statement_lifecycle_event(
580        StatementLoggingId(uuid): &StatementLoggingId,
581        event: &StatementLifecycleEvent,
582        when: EpochMillis,
583    ) -> Row {
584        Row::pack_slice(&[
585            Datum::Uuid(*uuid),
586            Datum::String(event.as_str()),
587            Datum::TimestampTz(mz_ore::now::to_datetime(when).try_into().expect("must fit")),
588        ])
589    }
590
591    pub fn pack_full_statement_execution_update(
592        began_record: &StatementBeganExecutionRecord,
593        ended_record: &StatementEndedExecutionRecord,
594    ) -> Row {
595        let mut row = Row::default();
596        let mut packer = row.packer();
597        Self::pack_statement_execution_inner(began_record, &mut packer);
598        let (status, error_message, result_size, rows_returned, execution_strategy) =
599            match &ended_record.reason {
600                StatementEndedExecutionReason::Success {
601                    result_size,
602                    rows_returned,
603                    execution_strategy,
604                } => (
605                    "success",
606                    None,
607                    result_size.map(|rs| i64::try_from(rs).expect("must fit")),
608                    rows_returned.map(|rr| i64::try_from(rr).expect("must fit")),
609                    execution_strategy.map(|es| es.name()),
610                ),
611                StatementEndedExecutionReason::Canceled => ("canceled", None, None, None, None),
612                StatementEndedExecutionReason::Errored { error } => {
613                    ("error", Some(error.as_str()), None, None, None)
614                }
615                StatementEndedExecutionReason::Aborted => ("aborted", None, None, None, None),
616            };
617        packer.extend([
618            Datum::TimestampTz(
619                to_datetime(ended_record.ended_at)
620                    .try_into()
621                    .expect("Sane system time"),
622            ),
623            status.into(),
624            error_message.into(),
625            result_size.into(),
626            rows_returned.into(),
627            execution_strategy.into(),
628        ]);
629        row
630    }
631
632    pub fn pack_statement_ended_execution_updates(
633        began_record: &StatementBeganExecutionRecord,
634        ended_record: &StatementEndedExecutionRecord,
635    ) -> [(Row, Diff); 2] {
636        let retraction = Self::pack_statement_began_execution_update(began_record);
637        let new = Self::pack_full_statement_execution_update(began_record, ended_record);
638        [(retraction, Diff::MINUS_ONE), (new, Diff::ONE)]
639    }
640
641    /// Mutate a statement execution record via the given function `f`.
642    fn mutate_record<F: FnOnce(&mut StatementBeganExecutionRecord)>(
643        &mut self,
644        StatementLoggingId(id): StatementLoggingId,
645        f: F,
646    ) {
647        let record = self
648            .statement_logging
649            .executions_begun
650            .get_mut(&id)
651            .expect("mutate_record must not be called after execution ends");
652        let retraction = Self::pack_statement_began_execution_update(record);
653        self.statement_logging
654            .pending_statement_execution_events
655            .push((retraction, Diff::MINUS_ONE));
656        f(record);
657        let update = Self::pack_statement_began_execution_update(record);
658        self.statement_logging
659            .pending_statement_execution_events
660            .push((update, Diff::ONE));
661    }
662
663    /// Set the `cluster_id` for a statement, once it's known.
664    pub fn set_statement_execution_cluster(
665        &mut self,
666        id: StatementLoggingId,
667        cluster_id: ClusterId,
668    ) {
669        let cluster_name = self.catalog().get_cluster(cluster_id).name.clone();
670        self.mutate_record(id, |record| {
671            record.cluster_name = Some(cluster_name);
672            record.cluster_id = Some(cluster_id);
673        });
674    }
675
676    /// Set the `execution_timestamp` for a statement, once it's known
677    pub fn set_statement_execution_timestamp(
678        &mut self,
679        id: StatementLoggingId,
680        timestamp: Timestamp,
681    ) {
682        self.mutate_record(id, |record| {
683            record.execution_timestamp = Some(u64::from(timestamp));
684        });
685    }
686
687    pub fn set_transient_index_id(&mut self, id: StatementLoggingId, transient_index_id: GlobalId) {
688        self.mutate_record(id, |record| {
689            record.transient_index_id = Some(transient_index_id)
690        });
691    }
692
693    /// Possibly record the beginning of statement execution, depending on a randomly-chosen value.
694    /// If the execution beginning was indeed logged, returns a `StatementLoggingId` that must be
695    /// passed to `end_statement_execution` to record when it ends.
696    ///
697    /// `lifecycle_timestamps` has timestamps that come from the Adapter frontend (`mz-pgwire`) part
698    /// of the lifecycle.
699    pub fn begin_statement_execution(
700        &mut self,
701        session: &mut Session,
702        params: &Params,
703        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
704        lifecycle_timestamps: Option<LifecycleTimestamps>,
705    ) -> Option<StatementLoggingId> {
706        let enable_internal_statement_logging = self
707            .catalog()
708            .system_config()
709            .enable_internal_statement_logging();
710        if session.user().is_internal() && !enable_internal_statement_logging {
711            return None;
712        }
713        let sample_rate = self.statement_execution_sample_rate(session);
714
715        let distribution = Bernoulli::new(sample_rate).expect("rate must be in range [0, 1]");
716        let sample = if self
717            .catalog()
718            .system_config()
719            .statement_logging_use_reproducible_rng()
720        {
721            distribution.sample(&mut self.statement_logging.reproducible_rng)
722        } else {
723            distribution.sample(&mut rand::rng())
724        };
725
726        // Figure out the cost of everything before we log.
727
728        // Track how many statements we're recording.
729        let sampled_label = sample.then_some("true").unwrap_or("false");
730        self.metrics
731            .statement_logging_records
732            .with_label_values(&[sampled_label])
733            .inc_by(1);
734
735        if let Some((sql, accounted)) = match session.qcell_rw(logging) {
736            PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
737            PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
738                Some((sql, accounted))
739            }
740        } {
741            if !*accounted {
742                self.metrics
743                    .statement_logging_unsampled_bytes
744                    .inc_by(u64::cast_from(sql.len()));
745                if sample {
746                    self.metrics
747                        .statement_logging_actual_bytes
748                        .inc_by(u64::cast_from(sql.len()));
749                }
750                *accounted = true;
751            }
752        }
753        if !sample {
754            return None;
755        }
756
757        let (maybe_ps, ps_uuid) = self.get_prepared_statement_info(session, logging);
758
759        let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
760            lifecycle_timestamps.received
761        } else {
762            self.now()
763        };
764        let now = self.now();
765        let execution_uuid = epoch_to_uuid_v7(&now);
766
767        let params = std::iter::zip(params.execute_types.iter(), params.datums.iter())
768            .map(|(r#type, datum)| {
769                mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
770                    let mut buf = BytesMut::new();
771                    val.encode_text(&mut buf);
772                    String::from_utf8(Into::<Vec<u8>>::into(buf))
773                        .expect("Serialization shouldn't produce non-UTF-8 strings.")
774                })
775            })
776            .collect();
777        let record = StatementBeganExecutionRecord {
778            id: execution_uuid,
779            prepared_statement_id: ps_uuid,
780            sample_rate,
781            params,
782            began_at,
783            application_name: session.application_name().to_string(),
784            transaction_isolation: session.vars().transaction_isolation().to_string(),
785            transaction_id: session
786                .transaction()
787                .inner()
788                .expect("Every statement runs in an explicit or implicit transaction")
789                .id,
790            mz_version: self
791                .catalog()
792                .state()
793                .config()
794                .build_info
795                .human_version(None),
796            // These are not known yet; we'll fill them in later.
797            cluster_id: None,
798            cluster_name: None,
799            execution_timestamp: None,
800            transient_index_id: None,
801            database_name: session.vars().database().into(),
802            search_path: session
803                .vars()
804                .search_path()
805                .iter()
806                .map(|s| s.as_str().to_string())
807                .collect(),
808        };
809        let mseh_update = Self::pack_statement_began_execution_update(&record);
810
811        let (maybe_ps_event, maybe_sh_event) = if let Some((ps_record, ps_event)) = maybe_ps {
812            if let Some(sh) = self
813                .statement_logging
814                .unlogged_sessions
815                .get(&ps_record.session_id)
816            {
817                (
818                    Some(ps_event),
819                    Some((Self::pack_session_history_update(sh), ps_record.session_id)),
820                )
821            } else {
822                (Some(ps_event), None)
823            }
824        } else {
825            (None, None)
826        };
827
828        let maybe_ps_prepared_statement = maybe_ps_event.as_ref().map(|e| &e.prepared_statement);
829        let maybe_ps_sql_text = maybe_ps_event.as_ref().map(|e| &e.sql_text);
830
831        if !self.statement_logging_throttling_check([
832            Some(&mseh_update),
833            maybe_ps_prepared_statement,
834            maybe_ps_sql_text,
835            maybe_sh_event.as_ref().map(|(row, _)| row),
836        ]) {
837            self.statement_logging.throttled_count += 1;
838            return None;
839        }
840        // When we successfully log the first instance of a prepared statement
841        // (i.e., it is not throttled), we also capture the number of previously
842        // throttled statement executions in the builtin prepared statement history table above,
843        // and then reset the throttled count for future tracking.
844        else if let PreparedStatementLoggingInfo::StillToLog { .. } = session.qcell_ro(logging) {
845            self.statement_logging.throttled_count = 0;
846        }
847
848        self.record_prepared_statement_as_logged(ps_uuid, session, logging);
849
850        self.record_statement_lifecycle_event(
851            &StatementLoggingId(execution_uuid),
852            &StatementLifecycleEvent::ExecutionBegan,
853            began_at,
854        );
855
856        self.statement_logging
857            .pending_statement_execution_events
858            .push((mseh_update, Diff::ONE));
859        self.statement_logging
860            .executions_begun
861            .insert(execution_uuid, record);
862
863        if let Some((sh_update, session_id)) = maybe_sh_event {
864            self.statement_logging
865                .pending_session_events
866                .push(sh_update);
867            // Mark the session as logged to avoid logging it again in the future
868            self.statement_logging.unlogged_sessions.remove(&session_id);
869        }
870        if let Some(ps_event) = maybe_ps_event {
871            self.statement_logging
872                .pending_prepared_statement_events
873                .push(ps_event);
874        }
875        Some(StatementLoggingId(execution_uuid))
876    }
877
878    /// Record a new connection event
879    pub fn begin_session_for_statement_logging(&mut self, session: &ConnMeta) {
880        let id = session.uuid();
881        let session_role = session.authenticated_role_id();
882        let event = SessionHistoryEvent {
883            id,
884            connected_at: session.connected_at(),
885            application_name: session.application_name().to_owned(),
886            authenticated_user: self.catalog.get_role(session_role).name.clone(),
887        };
888        self.statement_logging.unlogged_sessions.insert(id, event);
889    }
890
891    pub fn end_session_for_statement_logging(&mut self, uuid: Uuid) {
892        self.statement_logging.unlogged_sessions.remove(&uuid);
893    }
894
895    pub fn record_statement_lifecycle_event(
896        &mut self,
897        id: &StatementLoggingId,
898        event: &StatementLifecycleEvent,
899        when: EpochMillis,
900    ) {
901        if mz_adapter_types::dyncfgs::ENABLE_STATEMENT_LIFECYCLE_LOGGING
902            .get(self.catalog().system_config().dyncfgs())
903        {
904            let row = Self::pack_statement_lifecycle_event(id, event, when);
905            self.statement_logging
906                .pending_statement_lifecycle_events
907                .push(row);
908        }
909    }
910}
911
912mod sealed {
913    /// A struct that is purposefully private so folks are forced to use the constructor of an
914    /// enum.
915    #[derive(Debug, Copy, Clone)]
916    pub struct Private;
917}