mz_adapter/
statement_logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::sync::atomic::Ordering;
12use std::sync::{Arc, Mutex};
13
14use bytes::BytesMut;
15use mz_catalog::memory::objects::CatalogItem;
16use mz_controller_types::ClusterId;
17use mz_ore::cast::{CastFrom, CastInto};
18use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
19use mz_ore::soft_panic_or_log;
20use mz_repr::adt::array::ArrayDimension;
21use mz_repr::adt::timestamp::TimestampLike;
22use mz_repr::{Datum, GlobalId, Row, RowIterator, RowPacker, Timestamp};
23use mz_sql::ast::display::AstDisplay;
24use mz_sql::ast::{AstInfo, Statement};
25use mz_sql::plan::Params;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_sql::session::vars::SystemVars;
28use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
29use qcell::QCell;
30use rand::distr::{Bernoulli, Distribution};
31use sha2::{Digest, Sha256};
32use uuid::Uuid;
33
34use crate::catalog::CatalogState;
35use crate::session::{LifecycleTimestamps, Session, TransactionId};
36use crate::{AdapterError, CollectionIdBundle, ExecuteResponse};
37
38#[derive(Clone, Debug)]
39pub enum StatementLifecycleEvent {
40    ExecutionBegan,
41    OptimizationFinished,
42    StorageDependenciesFinished,
43    ComputeDependenciesFinished,
44    ExecutionFinished,
45}
46
47impl StatementLifecycleEvent {
48    pub fn as_str(&self) -> &str {
49        match self {
50            Self::ExecutionBegan => "execution-began",
51            Self::OptimizationFinished => "optimization-finished",
52            Self::StorageDependenciesFinished => "storage-dependencies-finished",
53            Self::ComputeDependenciesFinished => "compute-dependencies-finished",
54            Self::ExecutionFinished => "execution-finished",
55        }
56    }
57}
58
59/// Contains all the information necessary to generate the initial
60/// entry in `mz_statement_execution_history`. We need to keep this
61/// around in order to modify the entry later once the statement finishes executing.
62#[derive(Clone, Debug)]
63pub struct StatementBeganExecutionRecord {
64    pub id: Uuid,
65    pub prepared_statement_id: Uuid,
66    pub sample_rate: f64,
67    pub params: Vec<Option<String>>,
68    pub began_at: EpochMillis,
69    pub cluster_id: Option<ClusterId>,
70    pub cluster_name: Option<String>,
71    pub database_name: String,
72    pub search_path: Vec<String>,
73    pub application_name: String,
74    pub transaction_isolation: String,
75    pub execution_timestamp: Option<EpochMillis>,
76    pub transaction_id: TransactionId,
77    pub transient_index_id: Option<GlobalId>,
78    pub mz_version: String,
79}
80
81#[derive(Clone, Copy, Debug)]
82pub enum StatementExecutionStrategy {
83    /// The statement was executed by spinning up a dataflow.
84    Standard,
85    /// The statement was executed by reading from an existing
86    /// arrangement.
87    FastPath,
88    /// Experimental: The statement was executed by reading from an existing
89    /// persist collection.
90    PersistFastPath,
91    /// The statement was determined to be constant by
92    /// environmentd, and not sent to a cluster.
93    Constant,
94}
95
96impl StatementExecutionStrategy {
97    pub fn name(&self) -> &'static str {
98        match self {
99            Self::Standard => "standard",
100            Self::FastPath => "fast-path",
101            Self::PersistFastPath => "persist-fast-path",
102            Self::Constant => "constant",
103        }
104    }
105}
106
107#[derive(Clone, Debug)]
108pub enum StatementEndedExecutionReason {
109    Success {
110        result_size: Option<u64>,
111        rows_returned: Option<u64>,
112        execution_strategy: Option<StatementExecutionStrategy>,
113    },
114    Canceled,
115    Errored {
116        error: String,
117    },
118    Aborted,
119}
120
121#[derive(Clone, Debug)]
122pub struct StatementEndedExecutionRecord {
123    pub id: Uuid,
124    pub reason: StatementEndedExecutionReason,
125    pub ended_at: EpochMillis,
126}
127
128/// Contains all the information necessary to generate an entry in
129/// `mz_prepared_statement_history`
130#[derive(Clone, Debug)]
131pub(crate) struct StatementPreparedRecord {
132    pub id: Uuid,
133    pub sql_hash: [u8; 32],
134    pub name: String,
135    pub session_id: Uuid,
136    pub prepared_at: EpochMillis,
137    pub kind: Option<StatementKind>,
138}
139
140#[derive(Clone, Debug)]
141pub(crate) struct SessionHistoryEvent {
142    pub id: Uuid,
143    pub connected_at: EpochMillis,
144    pub application_name: String,
145    pub authenticated_user: String,
146}
147
148impl From<&Result<ExecuteResponse, AdapterError>> for StatementEndedExecutionReason {
149    fn from(value: &Result<ExecuteResponse, AdapterError>) -> StatementEndedExecutionReason {
150        match value {
151            Ok(resp) => resp.into(),
152            Err(e) => StatementEndedExecutionReason::Errored {
153                error: e.to_string(),
154            },
155        }
156    }
157}
158
159impl From<&ExecuteResponse> for StatementEndedExecutionReason {
160    fn from(value: &ExecuteResponse) -> StatementEndedExecutionReason {
161        match value {
162            ExecuteResponse::CopyTo { resp, .. } => match resp.as_ref() {
163                // NB [btv]: It's not clear that this combination
164                // can ever actually happen.
165                ExecuteResponse::SendingRowsImmediate { rows, .. } => {
166                    // Note(parkmycar): It potentially feels bad here to iterate over the entire
167                    // iterator _just_ to get the encoded result size. As noted above, it's not
168                    // entirely clear this case ever happens, so the simplicity is worth it.
169                    let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
170                    StatementEndedExecutionReason::Success {
171                        result_size: Some(u64::cast_from(result_size)),
172                        rows_returned: Some(u64::cast_from(rows.count())),
173                        execution_strategy: Some(StatementExecutionStrategy::Constant),
174                    }
175                }
176                ExecuteResponse::SendingRowsStreaming { .. } => {
177                    panic!("SELECTs terminate on peek finalization, not here.")
178                }
179                ExecuteResponse::Subscribing { .. } => {
180                    panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
181                }
182                _ => panic!("Invalid COPY response type"),
183            },
184            ExecuteResponse::CopyFrom { .. } => {
185                panic!("COPY FROMs terminate in the protocol layer, not here.")
186            }
187            ExecuteResponse::Fetch { .. } => {
188                panic!("FETCHes terminate after a follow-up message is sent.")
189            }
190            ExecuteResponse::SendingRowsStreaming { .. } => {
191                panic!("SELECTs terminate on peek finalization, not here.")
192            }
193            ExecuteResponse::Subscribing { .. } => {
194                panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
195            }
196
197            ExecuteResponse::SendingRowsImmediate { rows, .. } => {
198                // Note(parkmycar): It potentially feels bad here to iterate over the entire
199                // iterator _just_ to get the encoded result size, the number of Rows returned here
200                // shouldn't be too large though. An alternative is to pre-compute some of the
201                // result size, but that would require always decoding Rows to handle projecting
202                // away columns, which has a negative impact for much larger response sizes.
203                let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
204                StatementEndedExecutionReason::Success {
205                    result_size: Some(u64::cast_from(result_size)),
206                    rows_returned: Some(u64::cast_from(rows.count())),
207                    execution_strategy: Some(StatementExecutionStrategy::Constant),
208                }
209            }
210
211            ExecuteResponse::AlteredDefaultPrivileges
212            | ExecuteResponse::AlteredObject(_)
213            | ExecuteResponse::AlteredRole
214            | ExecuteResponse::AlteredSystemConfiguration
215            | ExecuteResponse::ClosedCursor
216            | ExecuteResponse::Comment
217            | ExecuteResponse::Copied(_)
218            | ExecuteResponse::CreatedConnection
219            | ExecuteResponse::CreatedDatabase
220            | ExecuteResponse::CreatedSchema
221            | ExecuteResponse::CreatedRole
222            | ExecuteResponse::CreatedCluster
223            | ExecuteResponse::CreatedClusterReplica
224            | ExecuteResponse::CreatedIndex
225            | ExecuteResponse::CreatedIntrospectionSubscribe
226            | ExecuteResponse::CreatedSecret
227            | ExecuteResponse::CreatedSink
228            | ExecuteResponse::CreatedSource
229            | ExecuteResponse::CreatedTable
230            | ExecuteResponse::CreatedView
231            | ExecuteResponse::CreatedViews
232            | ExecuteResponse::CreatedMaterializedView
233            | ExecuteResponse::CreatedContinualTask
234            | ExecuteResponse::CreatedType
235            | ExecuteResponse::CreatedNetworkPolicy
236            | ExecuteResponse::Deallocate { .. }
237            | ExecuteResponse::DeclaredCursor
238            | ExecuteResponse::Deleted(_)
239            | ExecuteResponse::DiscardedTemp
240            | ExecuteResponse::DiscardedAll
241            | ExecuteResponse::DroppedObject(_)
242            | ExecuteResponse::DroppedOwned
243            | ExecuteResponse::EmptyQuery
244            | ExecuteResponse::GrantedPrivilege
245            | ExecuteResponse::GrantedRole
246            | ExecuteResponse::Inserted(_)
247            | ExecuteResponse::Prepare
248            | ExecuteResponse::Raised
249            | ExecuteResponse::ReassignOwned
250            | ExecuteResponse::RevokedPrivilege
251            | ExecuteResponse::RevokedRole
252            | ExecuteResponse::SetVariable { .. }
253            | ExecuteResponse::StartedTransaction
254            | ExecuteResponse::TransactionCommitted { .. }
255            | ExecuteResponse::TransactionRolledBack { .. }
256            | ExecuteResponse::Updated(_)
257            | ExecuteResponse::ValidatedConnection { .. } => {
258                StatementEndedExecutionReason::Success {
259                    result_size: None,
260                    rows_returned: None,
261                    execution_strategy: None,
262                }
263            }
264        }
265    }
266}
267
268mod sealed {
269    /// A struct that is purposefully private so folks are forced to use the constructor of an
270    /// enum.
271    #[derive(Debug, Copy, Clone)]
272    pub struct Private;
273}
274
275/// Metadata required for logging a prepared statement.
276#[derive(Debug)]
277pub enum PreparedStatementLoggingInfo {
278    /// The statement has already been logged; we don't need to log it
279    /// again if a future execution hits the sampling rate; we merely
280    /// need to reference the corresponding UUID.
281    AlreadyLogged { uuid: Uuid },
282    /// The statement has not yet been logged; if a future execution
283    /// hits the sampling rate, we need to log it at that point.
284    StillToLog {
285        /// The SQL text of the statement.
286        sql: String,
287        /// The SQL text of the statement, redacted to follow our data management
288        /// policy
289        redacted_sql: String,
290        /// When the statement was prepared
291        prepared_at: EpochMillis,
292        /// The name with which the statement was prepared
293        name: String,
294        /// The ID of the session that prepared the statement
295        session_id: Uuid,
296        /// Whether we have already recorded this in the "would have logged" metric
297        accounted: bool,
298        /// The top-level kind of the statement (e.g., `Select`), or `None` for an empty statement
299        kind: Option<StatementKind>,
300
301        /// Private type that forces use of the [`PreparedStatementLoggingInfo::still_to_log`]
302        /// constructor.
303        _sealed: sealed::Private,
304    },
305}
306
307impl PreparedStatementLoggingInfo {
308    /// Constructor for the [`PreparedStatementLoggingInfo::StillToLog`] variant that ensures SQL
309    /// statements are properly redacted.
310    pub fn still_to_log<A: AstInfo>(
311        raw_sql: String,
312        stmt: Option<&Statement<A>>,
313        prepared_at: EpochMillis,
314        name: String,
315        session_id: Uuid,
316        accounted: bool,
317    ) -> Self {
318        let kind = stmt.map(StatementKind::from);
319        let sql = match kind {
320            // Always redact SQL statements that may contain sensitive information.
321            // CREATE SECRET and ALTER SECRET statements can contain secret values, so we redact them.
322            // INSERT, UPDATE, and EXECUTE statements can include large amounts of user data, so we redact them for both
323            // data privacy and to avoid logging excessive data.
324            Some(
325                StatementKind::CreateSecret
326                | StatementKind::AlterSecret
327                | StatementKind::Insert
328                | StatementKind::Update
329                | StatementKind::Execute,
330            ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
331            _ => raw_sql,
332        };
333
334        PreparedStatementLoggingInfo::StillToLog {
335            sql,
336            redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
337            prepared_at,
338            name,
339            session_id,
340            accounted,
341            kind,
342            _sealed: sealed::Private,
343        }
344    }
345}
346
347#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
348pub struct StatementLoggingId(pub Uuid);
349
350/// Rows to be written to `mz_prepared_statement_history` and `mz_sql_text`, with the session id.
351#[derive(Debug, Clone)]
352pub struct PreparedStatementEvent {
353    pub prepared_statement: Row,
354    pub sql_text: Row,
355    pub session_id: Uuid,
356}
357
358/// Throttling state for statement logging, shared across multiple frontend tasks (and currently
359/// also shared with the old peek sequencing).
360#[derive(Debug)]
361pub struct ThrottlingState {
362    /// Inner state protected by a mutex for rate-limiting, because the two inner fields have to be
363    /// manipulated together atomically.
364    /// This mutex is locked once per unsampled query. (There is both sampling and throttling.
365    /// Sampling happens before throttling.) This should be ok for now: Our QPS will not be more
366    /// than 10000s for now, and a mutex should be able to do 100000s of lockings per second, even
367    /// with some contention. If this ever becomes an issue, then we could redesign throttling to be
368    /// per-session/per-tokio-worker-thread.
369    inner: Mutex<ThrottlingStateInner>,
370    /// The number of statements that have been throttled since the last successfully logged
371    /// statement. This is not needed for the throttling decision itself, so it can be a separate
372    /// atomic to allow reading/writing without acquiring the inner mutex.
373    throttled_count: std::sync::atomic::AtomicUsize,
374}
375
376#[derive(Debug)]
377struct ThrottlingStateInner {
378    /// The number of bytes that we are allowed to emit for statement logging without being throttled.
379    /// Increases at a rate of [`mz_sql::session::vars::STATEMENT_LOGGING_TARGET_DATA_RATE`] per second,
380    /// up to a max value of [`mz_sql::session::vars::STATEMENT_LOGGING_MAX_DATA_CREDIT`].
381    tokens: u64,
382    /// The last time at which a statement was logged.
383    last_logged_ts_seconds: u64,
384}
385
386impl ThrottlingState {
387    /// Create a new throttling state.
388    pub fn new(now: &NowFn) -> Self {
389        Self {
390            inner: Mutex::new(ThrottlingStateInner {
391                tokens: 0,
392                last_logged_ts_seconds: now() / 1000,
393            }),
394            throttled_count: std::sync::atomic::AtomicUsize::new(0),
395        }
396    }
397
398    /// Check if we need to drop a statement due to throttling, and update the number of available
399    /// tokens appropriately.
400    ///
401    /// Returns `false` if we must throttle this statement, and `true` otherwise.
402    /// Note: `throttled_count` is NOT modified by this method - callers are responsible
403    /// for incrementing it on throttle failure and resetting it when appropriate.
404    pub fn throttling_check(
405        &self,
406        cost: u64,
407        target_data_rate: u64,
408        max_data_credit: Option<u64>,
409        now: &NowFn,
410    ) -> bool {
411        let ts = now() / 1000;
412        let mut inner = self.inner.lock().expect("throttling state lock poisoned");
413        // We use saturating_sub here because system time isn't monotonic, causing cases
414        // when last_logged_ts_seconds is greater than ts.
415        let elapsed = ts.saturating_sub(inner.last_logged_ts_seconds);
416        inner.last_logged_ts_seconds = ts;
417        inner.tokens = inner
418            .tokens
419            .saturating_add(target_data_rate.saturating_mul(elapsed));
420        if let Some(max_data_credit) = max_data_credit {
421            inner.tokens = inner.tokens.min(max_data_credit);
422        }
423        if let Some(remaining) = inner.tokens.checked_sub(cost) {
424            tracing::debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
425            inner.tokens = remaining;
426            true
427        } else {
428            tracing::debug!(
429                "throttling check failed. tokens available: {}; cost: {cost}",
430                inner.tokens
431            );
432            false
433        }
434    }
435
436    pub fn get_throttled_count(&self) -> usize {
437        self.throttled_count.load(Ordering::Relaxed)
438    }
439
440    pub fn increment_throttled_count(&self) {
441        self.throttled_count.fetch_add(1, Ordering::Relaxed);
442    }
443
444    pub fn reset_throttled_count(&self) {
445        self.throttled_count.store(0, Ordering::Relaxed);
446    }
447}
448
449/// Encapsulates statement logging state needed by the frontend peek sequencing.
450///
451/// This struct bundles together all the statement logging-related state that
452/// the frontend peek sequencing needs to perform statement logging independently
453/// of the Coordinator's main task.
454#[derive(Debug, Clone)]
455pub struct StatementLoggingFrontend {
456    /// Shared throttling state for rate-limiting statement logging.
457    pub throttling_state: Arc<ThrottlingState>,
458    /// Reproducible RNG for statement sampling (only used in tests).
459    pub reproducible_rng: Arc<Mutex<rand_chacha::ChaCha8Rng>>,
460    /// Cached human version string from build info.
461    pub build_info_human_version: String,
462    /// Function to get current time for statement logging.
463    pub now: NowFn,
464}
465
466impl StatementLoggingFrontend {
467    /// Get prepared statement info for frontend peek sequencing.
468    ///
469    /// This function processes prepared statement logging info and builds the event rows.
470    /// It does NOT do throttling - that is handled externally by the caller in `begin_statement_execution`.
471    /// It DOES mutate the logging info to mark the statement as already logged.
472    ///
473    /// # Arguments
474    /// * `session` - The session executing the statement
475    /// * `logging` - Prepared statement logging info
476    ///
477    /// # Returns
478    /// A tuple containing:
479    /// - `Option<PreparedStatementEvent>`: If the prepared statement has not yet been logged,
480    ///   returns the packed rows for the prepared statement.
481    /// - `Uuid`: The UUID of the prepared statement.
482    fn get_prepared_statement_info(
483        &self,
484        session: &mut Session,
485        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
486    ) -> (Option<PreparedStatementEvent>, Uuid) {
487        let logging_ref = session.qcell_rw(&*logging);
488        let mut prepared_statement_event = None;
489
490        let ps_uuid = match logging_ref {
491            PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
492            PreparedStatementLoggingInfo::StillToLog {
493                sql,
494                redacted_sql,
495                prepared_at,
496                name,
497                session_id,
498                accounted,
499                kind,
500                _sealed: _,
501            } => {
502                assert!(
503                    *accounted,
504                    "accounting for logging should be done in `begin_statement_execution`"
505                );
506                let uuid = epoch_to_uuid_v7(prepared_at);
507                let sql = std::mem::take(sql);
508                let redacted_sql = std::mem::take(redacted_sql);
509                let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
510
511                // Copy session_id before mutating logging_ref
512                let sid = *session_id;
513
514                let record = StatementPreparedRecord {
515                    id: uuid,
516                    sql_hash,
517                    name: std::mem::take(name),
518                    session_id: sid,
519                    prepared_at: *prepared_at,
520                    kind: *kind,
521                };
522
523                // `mz_prepared_statement_history`
524                let mut mpsh_row = Row::default();
525                let mut mpsh_packer = mpsh_row.packer();
526                pack_statement_prepared_update(&record, &mut mpsh_packer);
527
528                let sql_row = Row::pack([
529                    Datum::TimestampTz(
530                        to_datetime(*prepared_at)
531                            .truncate_day()
532                            .try_into()
533                            .expect("must fit"),
534                    ),
535                    Datum::Bytes(sql_hash.as_slice()),
536                    Datum::String(sql.as_str()),
537                    Datum::String(redacted_sql.as_str()),
538                ]);
539
540                // Read throttled_count from shared state
541                let throttled_count = self.throttling_state.get_throttled_count();
542
543                mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count)));
544
545                prepared_statement_event = Some(PreparedStatementEvent {
546                    prepared_statement: mpsh_row,
547                    sql_text: sql_row,
548                    session_id: sid,
549                });
550
551                *logging_ref = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
552                uuid
553            }
554        };
555
556        (prepared_statement_event, ps_uuid)
557    }
558
559    /// Begin statement execution logging from the frontend. (Corresponds to
560    /// `Coordinator::begin_statement_execution`, which is used by the old peek sequencing.)
561    ///
562    /// This encapsulates all the statement logging setup:
563    /// - Retrieves system config values
564    /// - Performs sampling and throttling checks
565    /// - Creates statement logging records
566    /// - Attends to metrics.
567    ///
568    /// Returns None if the statement should not be logged (due to sampling or throttling), or the
569    /// info required to proceed with statement logging.
570    /// The `Row` is the pre-packed row for `mz_statement_execution_history`.
571    /// The `Option<PreparedStatementEvent>` is None when we have already logged the prepared
572    /// statement before, and this is just a subsequent execution.
573    pub fn begin_statement_execution(
574        &self,
575        session: &mut Session,
576        params: &Params,
577        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
578        system_config: &SystemVars,
579        lifecycle_timestamps: Option<LifecycleTimestamps>,
580    ) -> Option<(
581        StatementLoggingId,
582        StatementBeganExecutionRecord,
583        Row,
584        Option<PreparedStatementEvent>,
585    )> {
586        // Skip logging for internal users unless explicitly enabled
587        let enable_internal_statement_logging = system_config.enable_internal_statement_logging();
588        if session.user().is_internal() && !enable_internal_statement_logging {
589            return None;
590        }
591
592        let sample_rate = effective_sample_rate(session, system_config);
593
594        let use_reproducible_rng = system_config.statement_logging_use_reproducible_rng();
595        let target_data_rate: Option<u64> = system_config
596            .statement_logging_target_data_rate()
597            .map(|rate| rate.cast_into());
598        let max_data_credit: Option<u64> = system_config
599            .statement_logging_max_data_credit()
600            .map(|credit| credit.cast_into());
601
602        // Only lock the RNG when we actually need reproducible sampling (tests only)
603        let sample = if use_reproducible_rng {
604            let mut rng = self.reproducible_rng.lock().expect("rng lock poisoned");
605            should_sample_statement(sample_rate, Some(&mut *rng))
606        } else {
607            should_sample_statement(sample_rate, None)
608        };
609
610        let sampled_label = sample.then_some("true").unwrap_or("false");
611        session
612            .metrics()
613            .statement_logging_records(&[sampled_label])
614            .inc_by(1);
615
616        // Clone only the metrics needed below, before the mutable borrow of session.
617        let unsampled_bytes_metric = session
618            .metrics()
619            .statement_logging_unsampled_bytes()
620            .clone();
621        let actual_bytes_metric = session.metrics().statement_logging_actual_bytes().clone();
622
623        // Handle the accounted flag and record byte metrics
624        let is_new_prepared_statement = if let Some((sql, accounted)) =
625            match session.qcell_rw(logging) {
626                PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
627                PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
628                    Some((sql, accounted))
629                }
630            } {
631            if !*accounted {
632                unsampled_bytes_metric.inc_by(u64::cast_from(sql.len()));
633                if sample {
634                    actual_bytes_metric.inc_by(u64::cast_from(sql.len()));
635                }
636                *accounted = true;
637            }
638            true
639        } else {
640            false
641        };
642
643        if !sample {
644            return None;
645        }
646
647        // Get prepared statement info (this also marks it as logged)
648        let (prepared_statement_event, ps_uuid) =
649            self.get_prepared_statement_info(session, logging);
650
651        let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
652            lifecycle_timestamps.received
653        } else {
654            (self.now)()
655        };
656
657        let current_time = (self.now)();
658        let execution_uuid = epoch_to_uuid_v7(&current_time);
659
660        // Create the execution record
661        let began_execution = create_began_execution_record(
662            execution_uuid,
663            ps_uuid,
664            sample_rate,
665            params,
666            session,
667            began_at,
668            self.build_info_human_version.clone(),
669        );
670
671        // Build rows to calculate cost for throttling
672        let mseh_update = pack_statement_began_execution_update(&began_execution);
673        let maybe_ps_prepared_statement = prepared_statement_event
674            .as_ref()
675            .map(|e| &e.prepared_statement);
676        let maybe_ps_sql_text = prepared_statement_event.as_ref().map(|e| &e.sql_text);
677
678        // Calculate cost of all rows we intend to log
679        let cost: usize = [
680            Some(&mseh_update),
681            maybe_ps_prepared_statement,
682            maybe_ps_sql_text,
683        ]
684        .into_iter()
685        .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
686        .fold(0_usize, |acc, x| acc.saturating_add(x));
687
688        // Do throttling check
689        let passed = if let Some(target_data_rate) = target_data_rate {
690            self.throttling_state.throttling_check(
691                cost.cast_into(),
692                target_data_rate,
693                max_data_credit,
694                &self.now,
695            )
696        } else {
697            true // No throttling configured
698        };
699
700        if !passed {
701            // Increment throttled_count in shared state
702            self.throttling_state.increment_throttled_count();
703            return None;
704        }
705
706        // When we successfully log the first instance of a prepared statement
707        // (i.e., it is not throttled), reset the throttled count for future tracking.
708        if is_new_prepared_statement {
709            self.throttling_state.reset_throttled_count();
710        }
711
712        Some((
713            StatementLoggingId(execution_uuid),
714            began_execution,
715            mseh_update,
716            prepared_statement_event,
717        ))
718    }
719}
720
721/// The effective rate at which statement execution should be sampled.
722/// This is the value of the session var `statement_logging_sample_rate`,
723/// constrained by the system var `statement_logging_max_sample_rate`.
724pub(crate) fn effective_sample_rate(session: &Session, system_vars: &SystemVars) -> f64 {
725    let system_max: f64 = system_vars
726        .statement_logging_max_sample_rate()
727        .try_into()
728        .expect("value constrained to be convertible to f64");
729    let user_rate: f64 = session
730        .vars()
731        .get_statement_logging_sample_rate()
732        .try_into()
733        .expect("value constrained to be convertible to f64");
734    f64::min(system_max, user_rate)
735}
736
737/// Helper function to decide whether to sample a statement execution.
738/// Returns `true` if the statement should be sampled based on the sample rate.
739///
740/// If `reproducible_rng` is `Some`, uses the provided RNG for reproducible sampling (used in tests).
741/// If `reproducible_rng` is `None`, uses the thread-local RNG.
742pub(crate) fn should_sample_statement(
743    sample_rate: f64,
744    reproducible_rng: Option<&mut rand_chacha::ChaCha8Rng>,
745) -> bool {
746    let distribution = Bernoulli::new(sample_rate).unwrap_or_else(|_| {
747        soft_panic_or_log!("statement_logging_sample_rate is out of range [0, 1]");
748        Bernoulli::new(0.0).expect("0.0 is valid for Bernoulli")
749    });
750    if let Some(rng) = reproducible_rng {
751        distribution.sample(rng)
752    } else {
753        distribution.sample(&mut rand::rng())
754    }
755}
756
757/// Helper function to serialize statement parameters for logging.
758fn serialize_params(params: &Params) -> Vec<Option<String>> {
759    std::iter::zip(params.execute_types.iter(), params.datums.iter())
760        .map(|(r#type, datum)| {
761            mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
762                let mut buf = BytesMut::new();
763                val.encode_text(&mut buf);
764                String::from_utf8(Into::<Vec<u8>>::into(buf))
765                    .expect("Serialization shouldn't produce non-UTF-8 strings.")
766            })
767        })
768        .collect()
769}
770
771/// Helper function to create a `StatementBeganExecutionRecord`.
772pub(crate) fn create_began_execution_record(
773    execution_uuid: Uuid,
774    prepared_statement_uuid: Uuid,
775    sample_rate: f64,
776    params: &Params,
777    session: &Session,
778    began_at: EpochMillis,
779    build_info_version: String,
780) -> StatementBeganExecutionRecord {
781    let params = serialize_params(params);
782    StatementBeganExecutionRecord {
783        id: execution_uuid,
784        prepared_statement_id: prepared_statement_uuid,
785        sample_rate,
786        params,
787        began_at,
788        application_name: session.application_name().to_string(),
789        transaction_isolation: session.vars().transaction_isolation().to_string(),
790        transaction_id: session
791            .transaction()
792            .inner()
793            .map(|t| t.id)
794            .unwrap_or_else(|| {
795                // This should never happen because every statement runs in an explicit or implicit
796                // transaction.
797                soft_panic_or_log!(
798                    "Statement logging got a statement with no associated transaction"
799                );
800                9999999
801            }),
802        mz_version: build_info_version,
803        // These are not known yet; we'll fill them in later.
804        cluster_id: None,
805        cluster_name: None,
806        execution_timestamp: None,
807        transient_index_id: None,
808        database_name: session.vars().database().into(),
809        search_path: session
810            .vars()
811            .search_path()
812            .iter()
813            .map(|s| s.as_str().to_string())
814            .collect(),
815    }
816}
817
818/// Represents a single statement logging event that can be sent from the frontend
819/// peek sequencing to the Coordinator via an mpsc channel.
820#[derive(Debug, Clone)]
821pub enum FrontendStatementLoggingEvent {
822    /// Statement execution began, possibly with an associated prepared statement
823    /// if this is the first time the prepared statement is being logged
824    BeganExecution {
825        record: StatementBeganExecutionRecord,
826        /// `mz_statement_execution_history`
827        mseh_update: Row,
828        prepared_statement: Option<PreparedStatementEvent>,
829    },
830    /// Statement execution ended
831    EndedExecution(StatementEndedExecutionRecord),
832    /// Set the cluster for a statement execution
833    SetCluster {
834        id: StatementLoggingId,
835        cluster_id: ClusterId,
836    },
837    /// Set the execution timestamp for a statement
838    SetTimestamp {
839        id: StatementLoggingId,
840        timestamp: Timestamp,
841    },
842    /// Set the transient index ID for a statement
843    SetTransientIndex {
844        id: StatementLoggingId,
845        transient_index_id: GlobalId,
846    },
847    /// Record a statement lifecycle event
848    Lifecycle {
849        id: StatementLoggingId,
850        event: StatementLifecycleEvent,
851        when: EpochMillis,
852    },
853}
854
855pub(crate) fn pack_statement_execution_inner(
856    record: &StatementBeganExecutionRecord,
857    packer: &mut RowPacker,
858) {
859    let StatementBeganExecutionRecord {
860        id,
861        prepared_statement_id,
862        sample_rate,
863        params,
864        began_at,
865        cluster_id,
866        cluster_name,
867        database_name,
868        search_path,
869        application_name,
870        transaction_isolation,
871        execution_timestamp,
872        transaction_id,
873        transient_index_id,
874        mz_version,
875    } = record;
876
877    let cluster = cluster_id.map(|id| id.to_string());
878    let transient_index_id = transient_index_id.map(|id| id.to_string());
879    packer.extend([
880        Datum::Uuid(*id),
881        Datum::Uuid(*prepared_statement_id),
882        Datum::Float64((*sample_rate).into()),
883        match &cluster {
884            None => Datum::Null,
885            Some(cluster_id) => Datum::String(cluster_id),
886        },
887        Datum::String(&*application_name),
888        cluster_name.as_ref().map(String::as_str).into(),
889        Datum::String(database_name),
890    ]);
891    packer.push_list(search_path.iter().map(|s| Datum::String(s)));
892    packer.extend([
893        Datum::String(&*transaction_isolation),
894        (*execution_timestamp).into(),
895        Datum::UInt64(*transaction_id),
896        match &transient_index_id {
897            None => Datum::Null,
898            Some(transient_index_id) => Datum::String(transient_index_id),
899        },
900    ]);
901    packer
902        .try_push_array(
903            &[ArrayDimension {
904                lower_bound: 1,
905                length: params.len(),
906            }],
907            params
908                .iter()
909                .map(|p| Datum::from(p.as_ref().map(String::as_str))),
910        )
911        .expect("correct array dimensions");
912    packer.push(Datum::from(mz_version.as_str()));
913    packer.push(Datum::TimestampTz(
914        to_datetime(*began_at).try_into().expect("Sane system time"),
915    ));
916}
917
918pub(crate) fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
919    let mut row = Row::default();
920    let mut packer = row.packer();
921    pack_statement_execution_inner(record, &mut packer);
922    packer.extend([
923        // finished_at
924        Datum::Null,
925        // finished_status
926        Datum::Null,
927        // error_message
928        Datum::Null,
929        // result_size
930        Datum::Null,
931        // rows_returned
932        Datum::Null,
933        // execution_status
934        Datum::Null,
935    ]);
936    row
937}
938
939pub(crate) fn pack_statement_prepared_update(
940    record: &StatementPreparedRecord,
941    packer: &mut RowPacker,
942) {
943    let StatementPreparedRecord {
944        id,
945        session_id,
946        name,
947        sql_hash,
948        prepared_at,
949        kind,
950    } = record;
951    packer.extend([
952        Datum::Uuid(*id),
953        Datum::Uuid(*session_id),
954        Datum::String(name.as_str()),
955        Datum::Bytes(sql_hash.as_slice()),
956        Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
957        kind.map(statement_kind_label_value).into(),
958    ]);
959}
960
961/// Bundles all information needed to install watch sets for statement lifecycle logging.
962/// This includes the statement logging ID and the transitive dependencies to watch.
963#[derive(Debug)]
964pub struct WatchSetCreation {
965    /// The statement logging ID for this execution.
966    pub logging_id: StatementLoggingId,
967    /// The timestamp at which to watch for dependencies becoming ready.
968    pub timestamp: Timestamp,
969    /// Transitive storage dependencies (tables, sources) to watch.
970    pub storage_ids: BTreeSet<GlobalId>,
971    /// Transitive compute dependencies (materialized views, indexes) to watch.
972    pub compute_ids: BTreeSet<GlobalId>,
973}
974
975impl WatchSetCreation {
976    /// Compute transitive dependencies for watch sets from an input ID bundle, categorized into
977    /// storage and compute IDs.
978    pub fn new(
979        logging_id: StatementLoggingId,
980        catalog_state: &CatalogState,
981        input_id_bundle: &CollectionIdBundle,
982        timestamp: Timestamp,
983    ) -> Self {
984        let mut storage_ids = BTreeSet::new();
985        let mut compute_ids = BTreeSet::new();
986
987        for item_id in input_id_bundle
988            .iter()
989            .map(|gid| catalog_state.get_entry_by_global_id(&gid).id())
990            .flat_map(|id| catalog_state.transitive_uses(id))
991        {
992            let entry = catalog_state.get_entry(&item_id);
993            match entry.item() {
994                // TODO(alter_table): Adding all of the GlobalIds for an object is incorrect.
995                // For example, this peek may depend on just a single version of a table, but
996                // we would add dependencies on all versions of said table. Doing this is okay
997                // for now since we can't yet version tables, but should get fixed.
998                CatalogItem::Table(_) | CatalogItem::Source(_) => {
999                    storage_ids.extend(entry.global_ids());
1000                }
1001                // Each catalog item is computed by at most one compute collection at a time,
1002                // which is also the most recent one.
1003                CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
1004                    compute_ids.insert(entry.latest_global_id());
1005                }
1006                _ => {}
1007            }
1008        }
1009
1010        Self {
1011            logging_id,
1012            timestamp,
1013            storage_ids,
1014            compute_ids,
1015        }
1016    }
1017}