Skip to main content

mz_adapter/
statement_logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::sync::atomic::Ordering;
12use std::sync::{Arc, Mutex};
13
14use bytes::BytesMut;
15use mz_catalog::memory::objects::CatalogItem;
16use mz_controller_types::ClusterId;
17use mz_ore::cast::{CastFrom, CastInto};
18use mz_ore::now::{EpochMillis, NowFn, epoch_to_uuid_v7, to_datetime};
19use mz_ore::soft_panic_or_log;
20use mz_repr::adt::array::ArrayDimension;
21use mz_repr::adt::timestamp::TimestampLike;
22use mz_repr::{Datum, GlobalId, Row, RowIterator, RowPacker, Timestamp};
23use mz_sql::ast::display::AstDisplay;
24use mz_sql::ast::{AstInfo, Statement};
25use mz_sql::plan::Params;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_sql::session::vars::SystemVars;
28use mz_sql_parser::ast::{StatementKind, statement_kind_label_value};
29use qcell::QCell;
30use rand::distr::{Bernoulli, Distribution};
31use sha2::{Digest, Sha256};
32use uuid::Uuid;
33
34use crate::catalog::CatalogState;
35use crate::session::{LifecycleTimestamps, Session, TransactionId};
36use crate::{AdapterError, CollectionIdBundle, ExecuteResponse};
37
38#[derive(Clone, Debug)]
39pub enum StatementLifecycleEvent {
40    ExecutionBegan,
41    OptimizationFinished,
42    StorageDependenciesFinished,
43    ComputeDependenciesFinished,
44    ExecutionFinished,
45}
46
47impl StatementLifecycleEvent {
48    pub fn as_str(&self) -> &str {
49        match self {
50            Self::ExecutionBegan => "execution-began",
51            Self::OptimizationFinished => "optimization-finished",
52            Self::StorageDependenciesFinished => "storage-dependencies-finished",
53            Self::ComputeDependenciesFinished => "compute-dependencies-finished",
54            Self::ExecutionFinished => "execution-finished",
55        }
56    }
57}
58
59/// Contains all the information necessary to generate the initial
60/// entry in `mz_statement_execution_history`. We need to keep this
61/// around in order to modify the entry later once the statement finishes executing.
62#[derive(Clone, Debug)]
63pub struct StatementBeganExecutionRecord {
64    pub id: Uuid,
65    pub prepared_statement_id: Uuid,
66    pub sample_rate: f64,
67    pub params: Vec<Option<String>>,
68    pub began_at: EpochMillis,
69    pub cluster_id: Option<ClusterId>,
70    pub cluster_name: Option<String>,
71    pub database_name: String,
72    pub search_path: Vec<String>,
73    pub application_name: String,
74    pub transaction_isolation: String,
75    pub execution_timestamp: Option<EpochMillis>,
76    pub transaction_id: TransactionId,
77    pub transient_index_id: Option<GlobalId>,
78    pub mz_version: String,
79}
80
81#[derive(Clone, Copy, Debug)]
82pub enum StatementExecutionStrategy {
83    /// The statement was executed by spinning up a dataflow.
84    Standard,
85    /// The statement was executed by reading from an existing
86    /// arrangement.
87    FastPath,
88    /// Experimental: The statement was executed by reading from an existing
89    /// persist collection.
90    PersistFastPath,
91    /// The statement was determined to be constant by
92    /// environmentd, and not sent to a cluster.
93    Constant,
94}
95
96impl StatementExecutionStrategy {
97    pub fn name(&self) -> &'static str {
98        match self {
99            Self::Standard => "standard",
100            Self::FastPath => "fast-path",
101            Self::PersistFastPath => "persist-fast-path",
102            Self::Constant => "constant",
103        }
104    }
105}
106
107#[derive(Clone, Debug)]
108pub enum StatementEndedExecutionReason {
109    Success {
110        result_size: Option<u64>,
111        rows_returned: Option<u64>,
112        execution_strategy: Option<StatementExecutionStrategy>,
113    },
114    Canceled,
115    Errored {
116        error: String,
117    },
118    /// Should only be emitted by `impl Drop for ExecuteContextGuard`.
119    /// Code paths that explicitly complete execution should use
120    /// `retire_execute` with `Success`, `Canceled`, or `Errored`.
121    Aborted,
122}
123
124#[derive(Clone, Debug)]
125pub struct StatementEndedExecutionRecord {
126    pub id: Uuid,
127    pub reason: StatementEndedExecutionReason,
128    pub ended_at: EpochMillis,
129}
130
131/// Contains all the information necessary to generate an entry in
132/// `mz_prepared_statement_history`
133#[derive(Clone, Debug)]
134pub(crate) struct StatementPreparedRecord {
135    pub id: Uuid,
136    pub sql_hash: [u8; 32],
137    pub name: String,
138    pub session_id: Uuid,
139    pub prepared_at: EpochMillis,
140    pub kind: Option<StatementKind>,
141}
142
143#[derive(Clone, Debug)]
144pub(crate) struct SessionHistoryEvent {
145    pub id: Uuid,
146    pub connected_at: EpochMillis,
147    pub application_name: String,
148    pub authenticated_user: String,
149}
150
151impl From<&Result<ExecuteResponse, AdapterError>> for StatementEndedExecutionReason {
152    fn from(value: &Result<ExecuteResponse, AdapterError>) -> StatementEndedExecutionReason {
153        match value {
154            Ok(resp) => resp.into(),
155            Err(e) => StatementEndedExecutionReason::Errored {
156                error: e.to_string(),
157            },
158        }
159    }
160}
161
162impl From<&ExecuteResponse> for StatementEndedExecutionReason {
163    fn from(value: &ExecuteResponse) -> StatementEndedExecutionReason {
164        match value {
165            ExecuteResponse::CopyTo { resp, .. } => match resp.as_ref() {
166                // NB [btv]: It's not clear that this combination
167                // can ever actually happen.
168                ExecuteResponse::SendingRowsImmediate { rows, .. } => {
169                    // Note(parkmycar): It potentially feels bad here to iterate over the entire
170                    // iterator _just_ to get the encoded result size. As noted above, it's not
171                    // entirely clear this case ever happens, so the simplicity is worth it.
172                    let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
173                    StatementEndedExecutionReason::Success {
174                        result_size: Some(u64::cast_from(result_size)),
175                        rows_returned: Some(u64::cast_from(rows.count())),
176                        execution_strategy: Some(StatementExecutionStrategy::Constant),
177                    }
178                }
179                ExecuteResponse::SendingRowsStreaming { .. } => {
180                    panic!("SELECTs terminate on peek finalization, not here.")
181                }
182                ExecuteResponse::Subscribing { .. } => {
183                    panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
184                }
185                _ => panic!("Invalid COPY response type"),
186            },
187            ExecuteResponse::CopyFrom { .. } => {
188                panic!("COPY FROMs terminate in the protocol layer, not here.")
189            }
190            ExecuteResponse::Fetch { .. } => {
191                panic!("FETCHes terminate after a follow-up message is sent.")
192            }
193            ExecuteResponse::SendingRowsStreaming { .. } => {
194                panic!("SELECTs terminate on peek finalization, not here.")
195            }
196            ExecuteResponse::Subscribing { .. } => {
197                panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
198            }
199
200            ExecuteResponse::SendingRowsImmediate { rows, .. } => {
201                // Note(parkmycar): It potentially feels bad here to iterate over the entire
202                // iterator _just_ to get the encoded result size, the number of Rows returned here
203                // shouldn't be too large though. An alternative is to pre-compute some of the
204                // result size, but that would require always decoding Rows to handle projecting
205                // away columns, which has a negative impact for much larger response sizes.
206                let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
207                StatementEndedExecutionReason::Success {
208                    result_size: Some(u64::cast_from(result_size)),
209                    rows_returned: Some(u64::cast_from(rows.count())),
210                    execution_strategy: Some(StatementExecutionStrategy::Constant),
211                }
212            }
213
214            ExecuteResponse::AlteredDefaultPrivileges
215            | ExecuteResponse::AlteredObject(_)
216            | ExecuteResponse::AlteredRole
217            | ExecuteResponse::AlteredSystemConfiguration
218            | ExecuteResponse::ClosedCursor
219            | ExecuteResponse::Comment
220            | ExecuteResponse::Copied(_)
221            | ExecuteResponse::CreatedConnection
222            | ExecuteResponse::CreatedDatabase
223            | ExecuteResponse::CreatedSchema
224            | ExecuteResponse::CreatedRole
225            | ExecuteResponse::CreatedCluster
226            | ExecuteResponse::CreatedClusterReplica
227            | ExecuteResponse::CreatedIndex
228            | ExecuteResponse::CreatedIntrospectionSubscribe
229            | ExecuteResponse::CreatedSecret
230            | ExecuteResponse::CreatedSink
231            | ExecuteResponse::CreatedSource
232            | ExecuteResponse::CreatedTable
233            | ExecuteResponse::CreatedView
234            | ExecuteResponse::CreatedViews
235            | ExecuteResponse::CreatedMaterializedView
236            | ExecuteResponse::CreatedContinualTask
237            | ExecuteResponse::CreatedType
238            | ExecuteResponse::CreatedNetworkPolicy
239            | ExecuteResponse::Deallocate { .. }
240            | ExecuteResponse::DeclaredCursor
241            | ExecuteResponse::Deleted(_)
242            | ExecuteResponse::DiscardedTemp
243            | ExecuteResponse::DiscardedAll
244            | ExecuteResponse::DroppedObject(_)
245            | ExecuteResponse::DroppedOwned
246            | ExecuteResponse::EmptyQuery
247            | ExecuteResponse::GrantedPrivilege
248            | ExecuteResponse::GrantedRole
249            | ExecuteResponse::Inserted(_)
250            | ExecuteResponse::Prepare
251            | ExecuteResponse::Raised
252            | ExecuteResponse::ReassignOwned
253            | ExecuteResponse::RevokedPrivilege
254            | ExecuteResponse::RevokedRole
255            | ExecuteResponse::SetVariable { .. }
256            | ExecuteResponse::StartedTransaction
257            | ExecuteResponse::TransactionCommitted { .. }
258            | ExecuteResponse::TransactionRolledBack { .. }
259            | ExecuteResponse::Updated(_)
260            | ExecuteResponse::ValidatedConnection { .. } => {
261                StatementEndedExecutionReason::Success {
262                    result_size: None,
263                    rows_returned: None,
264                    execution_strategy: None,
265                }
266            }
267        }
268    }
269}
270
271mod sealed {
272    /// A struct that is purposefully private so folks are forced to use the constructor of an
273    /// enum.
274    #[derive(Debug, Copy, Clone)]
275    pub struct Private;
276}
277
278/// Metadata required for logging a prepared statement.
279#[derive(Debug)]
280pub enum PreparedStatementLoggingInfo {
281    /// The statement has already been logged; we don't need to log it
282    /// again if a future execution hits the sampling rate; we merely
283    /// need to reference the corresponding UUID.
284    AlreadyLogged { uuid: Uuid },
285    /// The statement has not yet been logged; if a future execution
286    /// hits the sampling rate, we need to log it at that point.
287    StillToLog {
288        /// The SQL text of the statement.
289        sql: String,
290        /// The SQL text of the statement, redacted to follow our data management
291        /// policy
292        redacted_sql: String,
293        /// When the statement was prepared
294        prepared_at: EpochMillis,
295        /// The name with which the statement was prepared
296        name: String,
297        /// The ID of the session that prepared the statement
298        session_id: Uuid,
299        /// Whether we have already recorded this in the "would have logged" metric
300        accounted: bool,
301        /// The top-level kind of the statement (e.g., `Select`), or `None` for an empty statement
302        kind: Option<StatementKind>,
303
304        /// Private type that forces use of the [`PreparedStatementLoggingInfo::still_to_log`]
305        /// constructor.
306        _sealed: sealed::Private,
307    },
308}
309
310impl PreparedStatementLoggingInfo {
311    /// Constructor for the [`PreparedStatementLoggingInfo::StillToLog`] variant that ensures SQL
312    /// statements are properly redacted.
313    pub fn still_to_log<A: AstInfo>(
314        raw_sql: String,
315        stmt: Option<&Statement<A>>,
316        prepared_at: EpochMillis,
317        name: String,
318        session_id: Uuid,
319        accounted: bool,
320    ) -> Self {
321        let kind = stmt.map(StatementKind::from);
322        let sql = match kind {
323            // Always redact SQL statements that may contain sensitive information.
324            // CREATE SECRET and ALTER SECRET statements can contain secret values, so we redact them.
325            // INSERT, UPDATE, and EXECUTE statements can include large amounts of user data, so we redact them for both
326            // data privacy and to avoid logging excessive data.
327            Some(
328                StatementKind::CreateSecret
329                | StatementKind::AlterSecret
330                | StatementKind::Insert
331                | StatementKind::Update
332                | StatementKind::Execute,
333            ) => stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
334            _ => raw_sql,
335        };
336
337        PreparedStatementLoggingInfo::StillToLog {
338            sql,
339            redacted_sql: stmt.map(|s| s.to_ast_string_redacted()).unwrap_or_default(),
340            prepared_at,
341            name,
342            session_id,
343            accounted,
344            kind,
345            _sealed: sealed::Private,
346        }
347    }
348}
349
350#[derive(Copy, Clone, Debug, Ord, Eq, PartialOrd, PartialEq)]
351pub struct StatementLoggingId(pub Uuid);
352
353/// Rows to be written to `mz_prepared_statement_history` and `mz_sql_text`, with the session id.
354#[derive(Debug, Clone)]
355pub struct PreparedStatementEvent {
356    pub prepared_statement: Row,
357    pub sql_text: Row,
358    pub session_id: Uuid,
359}
360
361/// Throttling state for statement logging, shared across multiple frontend tasks (and currently
362/// also shared with the old peek sequencing).
363#[derive(Debug)]
364pub struct ThrottlingState {
365    /// Inner state protected by a mutex for rate-limiting, because the two inner fields have to be
366    /// manipulated together atomically.
367    /// This mutex is locked once per unsampled query. (There is both sampling and throttling.
368    /// Sampling happens before throttling.) This should be ok for now: Our QPS will not be more
369    /// than 10000s for now, and a mutex should be able to do 100000s of lockings per second, even
370    /// with some contention. If this ever becomes an issue, then we could redesign throttling to be
371    /// per-session/per-tokio-worker-thread.
372    inner: Mutex<ThrottlingStateInner>,
373    /// The number of statements that have been throttled since the last successfully logged
374    /// statement. This is not needed for the throttling decision itself, so it can be a separate
375    /// atomic to allow reading/writing without acquiring the inner mutex.
376    throttled_count: std::sync::atomic::AtomicUsize,
377}
378
379#[derive(Debug)]
380struct ThrottlingStateInner {
381    /// The number of bytes that we are allowed to emit for statement logging without being throttled.
382    /// Increases at a rate of [`mz_sql::session::vars::STATEMENT_LOGGING_TARGET_DATA_RATE`] per second,
383    /// up to a max value of [`mz_sql::session::vars::STATEMENT_LOGGING_MAX_DATA_CREDIT`].
384    tokens: u64,
385    /// The last time at which a statement was logged.
386    last_logged_ts_seconds: u64,
387}
388
389impl ThrottlingState {
390    /// Create a new throttling state.
391    pub fn new(now: &NowFn) -> Self {
392        Self {
393            inner: Mutex::new(ThrottlingStateInner {
394                tokens: 0,
395                last_logged_ts_seconds: now() / 1000,
396            }),
397            throttled_count: std::sync::atomic::AtomicUsize::new(0),
398        }
399    }
400
401    /// Check if we need to drop a statement due to throttling, and update the number of available
402    /// tokens appropriately.
403    ///
404    /// Returns `false` if we must throttle this statement, and `true` otherwise.
405    /// Note: `throttled_count` is NOT modified by this method - callers are responsible
406    /// for incrementing it on throttle failure and resetting it when appropriate.
407    pub fn throttling_check(
408        &self,
409        cost: u64,
410        target_data_rate: u64,
411        max_data_credit: Option<u64>,
412        now: &NowFn,
413    ) -> bool {
414        let ts = now() / 1000;
415        let mut inner = self.inner.lock().expect("throttling state lock poisoned");
416        // We use saturating_sub here because system time isn't monotonic, causing cases
417        // when last_logged_ts_seconds is greater than ts.
418        let elapsed = ts.saturating_sub(inner.last_logged_ts_seconds);
419        inner.last_logged_ts_seconds = ts;
420        inner.tokens = inner
421            .tokens
422            .saturating_add(target_data_rate.saturating_mul(elapsed));
423        if let Some(max_data_credit) = max_data_credit {
424            inner.tokens = inner.tokens.min(max_data_credit);
425        }
426        if let Some(remaining) = inner.tokens.checked_sub(cost) {
427            tracing::debug!("throttling check passed. tokens remaining: {remaining}; cost: {cost}");
428            inner.tokens = remaining;
429            true
430        } else {
431            tracing::debug!(
432                "throttling check failed. tokens available: {}; cost: {cost}",
433                inner.tokens
434            );
435            false
436        }
437    }
438
439    pub fn get_throttled_count(&self) -> usize {
440        self.throttled_count.load(Ordering::Relaxed)
441    }
442
443    pub fn increment_throttled_count(&self) {
444        self.throttled_count.fetch_add(1, Ordering::Relaxed);
445    }
446
447    pub fn reset_throttled_count(&self) {
448        self.throttled_count.store(0, Ordering::Relaxed);
449    }
450}
451
452/// Encapsulates statement logging state needed by the frontend peek sequencing.
453///
454/// This struct bundles together all the statement logging-related state that
455/// the frontend peek sequencing needs to perform statement logging independently
456/// of the Coordinator's main task.
457#[derive(Debug, Clone)]
458pub struct StatementLoggingFrontend {
459    /// Shared throttling state for rate-limiting statement logging.
460    pub throttling_state: Arc<ThrottlingState>,
461    /// Reproducible RNG for statement sampling (only used in tests).
462    pub reproducible_rng: Arc<Mutex<rand_chacha::ChaCha8Rng>>,
463    /// Cached human version string from build info.
464    pub build_info_human_version: String,
465    /// Function to get current time for statement logging.
466    pub now: NowFn,
467}
468
469impl StatementLoggingFrontend {
470    /// Get prepared statement info for frontend peek sequencing.
471    ///
472    /// This function processes prepared statement logging info and builds the event rows.
473    /// It does NOT do throttling - that is handled externally by the caller in `begin_statement_execution`.
474    /// It DOES mutate the logging info to mark the statement as already logged.
475    ///
476    /// # Arguments
477    /// * `session` - The session executing the statement
478    /// * `logging` - Prepared statement logging info
479    ///
480    /// # Returns
481    /// A tuple containing:
482    /// - `Option<PreparedStatementEvent>`: If the prepared statement has not yet been logged,
483    ///   returns the packed rows for the prepared statement.
484    /// - `Uuid`: The UUID of the prepared statement.
485    fn get_prepared_statement_info(
486        &self,
487        session: &mut Session,
488        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
489    ) -> (Option<PreparedStatementEvent>, Uuid) {
490        let logging_ref = session.qcell_rw(&*logging);
491        let mut prepared_statement_event = None;
492
493        let ps_uuid = match logging_ref {
494            PreparedStatementLoggingInfo::AlreadyLogged { uuid } => *uuid,
495            PreparedStatementLoggingInfo::StillToLog {
496                sql,
497                redacted_sql,
498                prepared_at,
499                name,
500                session_id,
501                accounted,
502                kind,
503                _sealed: _,
504            } => {
505                assert!(
506                    *accounted,
507                    "accounting for logging should be done in `begin_statement_execution`"
508                );
509                let uuid = epoch_to_uuid_v7(prepared_at);
510                let sql = std::mem::take(sql);
511                let redacted_sql = std::mem::take(redacted_sql);
512                let sql_hash: [u8; 32] = Sha256::digest(sql.as_bytes()).into();
513
514                // Copy session_id before mutating logging_ref
515                let sid = *session_id;
516
517                let record = StatementPreparedRecord {
518                    id: uuid,
519                    sql_hash,
520                    name: std::mem::take(name),
521                    session_id: sid,
522                    prepared_at: *prepared_at,
523                    kind: *kind,
524                };
525
526                // `mz_prepared_statement_history`
527                let mut mpsh_row = Row::default();
528                let mut mpsh_packer = mpsh_row.packer();
529                pack_statement_prepared_update(&record, &mut mpsh_packer);
530
531                let sql_row = Row::pack([
532                    Datum::TimestampTz(
533                        to_datetime(*prepared_at)
534                            .truncate_day()
535                            .try_into()
536                            .expect("must fit"),
537                    ),
538                    Datum::Bytes(sql_hash.as_slice()),
539                    Datum::String(sql.as_str()),
540                    Datum::String(redacted_sql.as_str()),
541                ]);
542
543                // Read throttled_count from shared state
544                let throttled_count = self.throttling_state.get_throttled_count();
545
546                mpsh_packer.push(Datum::UInt64(CastFrom::cast_from(throttled_count)));
547
548                prepared_statement_event = Some(PreparedStatementEvent {
549                    prepared_statement: mpsh_row,
550                    sql_text: sql_row,
551                    session_id: sid,
552                });
553
554                *logging_ref = PreparedStatementLoggingInfo::AlreadyLogged { uuid };
555                uuid
556            }
557        };
558
559        (prepared_statement_event, ps_uuid)
560    }
561
562    /// Begin statement execution logging from the frontend. (Corresponds to
563    /// `Coordinator::begin_statement_execution`, which is used by the old peek sequencing.)
564    ///
565    /// This encapsulates all the statement logging setup:
566    /// - Retrieves system config values
567    /// - Performs sampling and throttling checks
568    /// - Creates statement logging records
569    /// - Attends to metrics.
570    ///
571    /// Returns None if the statement should not be logged (due to sampling or throttling), or the
572    /// info required to proceed with statement logging.
573    /// The `Row` is the pre-packed row for `mz_statement_execution_history`.
574    /// The `Option<PreparedStatementEvent>` is None when we have already logged the prepared
575    /// statement before, and this is just a subsequent execution.
576    pub fn begin_statement_execution(
577        &self,
578        session: &mut Session,
579        params: &Params,
580        logging: &Arc<QCell<PreparedStatementLoggingInfo>>,
581        system_config: &SystemVars,
582        lifecycle_timestamps: Option<LifecycleTimestamps>,
583    ) -> Option<(
584        StatementLoggingId,
585        StatementBeganExecutionRecord,
586        Row,
587        Option<PreparedStatementEvent>,
588    )> {
589        // Skip logging for internal users unless explicitly enabled
590        let enable_internal_statement_logging = system_config.enable_internal_statement_logging();
591        if session.user().is_internal() && !enable_internal_statement_logging {
592            return None;
593        }
594
595        let sample_rate = effective_sample_rate(session, system_config);
596
597        let use_reproducible_rng = system_config.statement_logging_use_reproducible_rng();
598        let target_data_rate: Option<u64> = system_config
599            .statement_logging_target_data_rate()
600            .map(|rate| rate.cast_into());
601        let max_data_credit: Option<u64> = system_config
602            .statement_logging_max_data_credit()
603            .map(|credit| credit.cast_into());
604
605        // Only lock the RNG when we actually need reproducible sampling (tests only)
606        let sample = if use_reproducible_rng {
607            let mut rng = self.reproducible_rng.lock().expect("rng lock poisoned");
608            should_sample_statement(sample_rate, Some(&mut *rng))
609        } else {
610            should_sample_statement(sample_rate, None)
611        };
612
613        let sampled_label = sample.then_some("true").unwrap_or("false");
614        session
615            .metrics()
616            .statement_logging_records(&[sampled_label])
617            .inc_by(1);
618
619        // Clone only the metrics needed below, before the mutable borrow of session.
620        let unsampled_bytes_metric = session
621            .metrics()
622            .statement_logging_unsampled_bytes()
623            .clone();
624        let actual_bytes_metric = session.metrics().statement_logging_actual_bytes().clone();
625
626        // Handle the accounted flag and record byte metrics
627        let is_new_prepared_statement = if let Some((sql, accounted)) =
628            match session.qcell_rw(logging) {
629                PreparedStatementLoggingInfo::AlreadyLogged { .. } => None,
630                PreparedStatementLoggingInfo::StillToLog { sql, accounted, .. } => {
631                    Some((sql, accounted))
632                }
633            } {
634            if !*accounted {
635                unsampled_bytes_metric.inc_by(u64::cast_from(sql.len()));
636                if sample {
637                    actual_bytes_metric.inc_by(u64::cast_from(sql.len()));
638                }
639                *accounted = true;
640            }
641            true
642        } else {
643            false
644        };
645
646        if !sample {
647            return None;
648        }
649
650        // Get prepared statement info (this also marks it as logged)
651        let (prepared_statement_event, ps_uuid) =
652            self.get_prepared_statement_info(session, logging);
653
654        let began_at = if let Some(lifecycle_timestamps) = lifecycle_timestamps {
655            lifecycle_timestamps.received
656        } else {
657            (self.now)()
658        };
659
660        let current_time = (self.now)();
661        let execution_uuid = epoch_to_uuid_v7(&current_time);
662
663        // Create the execution record
664        let began_execution = create_began_execution_record(
665            execution_uuid,
666            ps_uuid,
667            sample_rate,
668            params,
669            session,
670            began_at,
671            self.build_info_human_version.clone(),
672        );
673
674        // Build rows to calculate cost for throttling
675        let mseh_update = pack_statement_began_execution_update(&began_execution);
676        let maybe_ps_prepared_statement = prepared_statement_event
677            .as_ref()
678            .map(|e| &e.prepared_statement);
679        let maybe_ps_sql_text = prepared_statement_event.as_ref().map(|e| &e.sql_text);
680
681        // Calculate cost of all rows we intend to log
682        let cost: usize = [
683            Some(&mseh_update),
684            maybe_ps_prepared_statement,
685            maybe_ps_sql_text,
686        ]
687        .into_iter()
688        .filter_map(|row_opt| row_opt.map(|row| row.byte_len()))
689        .fold(0_usize, |acc, x| acc.saturating_add(x));
690
691        // Do throttling check
692        let passed = if let Some(target_data_rate) = target_data_rate {
693            self.throttling_state.throttling_check(
694                cost.cast_into(),
695                target_data_rate,
696                max_data_credit,
697                &self.now,
698            )
699        } else {
700            true // No throttling configured
701        };
702
703        if !passed {
704            // Increment throttled_count in shared state
705            self.throttling_state.increment_throttled_count();
706            return None;
707        }
708
709        // When we successfully log the first instance of a prepared statement
710        // (i.e., it is not throttled), reset the throttled count for future tracking.
711        if is_new_prepared_statement {
712            self.throttling_state.reset_throttled_count();
713        }
714
715        Some((
716            StatementLoggingId(execution_uuid),
717            began_execution,
718            mseh_update,
719            prepared_statement_event,
720        ))
721    }
722}
723
724/// The effective rate at which statement execution should be sampled.
725/// This is the value of the session var `statement_logging_sample_rate`,
726/// constrained by the system var `statement_logging_max_sample_rate`.
727pub(crate) fn effective_sample_rate(session: &Session, system_vars: &SystemVars) -> f64 {
728    let system_max: f64 = system_vars
729        .statement_logging_max_sample_rate()
730        .try_into()
731        .expect("value constrained to be convertible to f64");
732    let user_rate: f64 = session
733        .vars()
734        .get_statement_logging_sample_rate()
735        .try_into()
736        .expect("value constrained to be convertible to f64");
737    f64::min(system_max, user_rate)
738}
739
740/// Helper function to decide whether to sample a statement execution.
741/// Returns `true` if the statement should be sampled based on the sample rate.
742///
743/// If `reproducible_rng` is `Some`, uses the provided RNG for reproducible sampling (used in tests).
744/// If `reproducible_rng` is `None`, uses the thread-local RNG.
745pub(crate) fn should_sample_statement(
746    sample_rate: f64,
747    reproducible_rng: Option<&mut rand_chacha::ChaCha8Rng>,
748) -> bool {
749    let distribution = Bernoulli::new(sample_rate).unwrap_or_else(|_| {
750        soft_panic_or_log!("statement_logging_sample_rate is out of range [0, 1]");
751        Bernoulli::new(0.0).expect("0.0 is valid for Bernoulli")
752    });
753    if let Some(rng) = reproducible_rng {
754        distribution.sample(rng)
755    } else {
756        distribution.sample(&mut rand::rng())
757    }
758}
759
760/// Helper function to serialize statement parameters for logging.
761fn serialize_params(params: &Params) -> Vec<Option<String>> {
762    std::iter::zip(params.execute_types.iter(), params.datums.iter())
763        .map(|(r#type, datum)| {
764            mz_pgrepr::Value::from_datum(datum, r#type).map(|val| {
765                let mut buf = BytesMut::new();
766                val.encode_text(&mut buf);
767                String::from_utf8(Into::<Vec<u8>>::into(buf))
768                    .expect("Serialization shouldn't produce non-UTF-8 strings.")
769            })
770        })
771        .collect()
772}
773
774/// Helper function to create a `StatementBeganExecutionRecord`.
775pub(crate) fn create_began_execution_record(
776    execution_uuid: Uuid,
777    prepared_statement_uuid: Uuid,
778    sample_rate: f64,
779    params: &Params,
780    session: &Session,
781    began_at: EpochMillis,
782    build_info_version: String,
783) -> StatementBeganExecutionRecord {
784    let params = serialize_params(params);
785    StatementBeganExecutionRecord {
786        id: execution_uuid,
787        prepared_statement_id: prepared_statement_uuid,
788        sample_rate,
789        params,
790        began_at,
791        application_name: session.application_name().to_string(),
792        transaction_isolation: session.vars().transaction_isolation().to_string(),
793        transaction_id: session
794            .transaction()
795            .inner()
796            .map(|t| t.id)
797            .unwrap_or_else(|| {
798                // This should never happen because every statement runs in an explicit or implicit
799                // transaction.
800                soft_panic_or_log!(
801                    "Statement logging got a statement with no associated transaction"
802                );
803                9999999
804            }),
805        mz_version: build_info_version,
806        // These are not known yet; we'll fill them in later.
807        cluster_id: None,
808        cluster_name: None,
809        execution_timestamp: None,
810        transient_index_id: None,
811        database_name: session.vars().database().into(),
812        search_path: session
813            .vars()
814            .search_path()
815            .iter()
816            .map(|s| s.as_str().to_string())
817            .collect(),
818    }
819}
820
821/// Represents a single statement logging event that can be sent from the frontend
822/// peek sequencing to the Coordinator via an mpsc channel.
823#[derive(Debug, Clone)]
824pub enum FrontendStatementLoggingEvent {
825    /// Statement execution began, possibly with an associated prepared statement
826    /// if this is the first time the prepared statement is being logged
827    BeganExecution {
828        record: StatementBeganExecutionRecord,
829        /// `mz_statement_execution_history`
830        mseh_update: Row,
831        prepared_statement: Option<PreparedStatementEvent>,
832    },
833    /// Statement execution ended
834    EndedExecution(StatementEndedExecutionRecord),
835    /// Set the cluster for a statement execution
836    SetCluster {
837        id: StatementLoggingId,
838        cluster_id: ClusterId,
839    },
840    /// Set the execution timestamp for a statement
841    SetTimestamp {
842        id: StatementLoggingId,
843        timestamp: Timestamp,
844    },
845    /// Set the transient index ID for a statement
846    SetTransientIndex {
847        id: StatementLoggingId,
848        transient_index_id: GlobalId,
849    },
850    /// Record a statement lifecycle event
851    Lifecycle {
852        id: StatementLoggingId,
853        event: StatementLifecycleEvent,
854        when: EpochMillis,
855    },
856}
857
858pub(crate) fn pack_statement_execution_inner(
859    record: &StatementBeganExecutionRecord,
860    packer: &mut RowPacker,
861) {
862    let StatementBeganExecutionRecord {
863        id,
864        prepared_statement_id,
865        sample_rate,
866        params,
867        began_at,
868        cluster_id,
869        cluster_name,
870        database_name,
871        search_path,
872        application_name,
873        transaction_isolation,
874        execution_timestamp,
875        transaction_id,
876        transient_index_id,
877        mz_version,
878    } = record;
879
880    let cluster = cluster_id.map(|id| id.to_string());
881    let transient_index_id = transient_index_id.map(|id| id.to_string());
882    packer.extend([
883        Datum::Uuid(*id),
884        Datum::Uuid(*prepared_statement_id),
885        Datum::Float64((*sample_rate).into()),
886        match &cluster {
887            None => Datum::Null,
888            Some(cluster_id) => Datum::String(cluster_id),
889        },
890        Datum::String(&*application_name),
891        cluster_name.as_ref().map(String::as_str).into(),
892        Datum::String(database_name),
893    ]);
894    packer.push_list(search_path.iter().map(|s| Datum::String(s)));
895    packer.extend([
896        Datum::String(&*transaction_isolation),
897        (*execution_timestamp).into(),
898        Datum::UInt64(*transaction_id),
899        match &transient_index_id {
900            None => Datum::Null,
901            Some(transient_index_id) => Datum::String(transient_index_id),
902        },
903    ]);
904    packer
905        .try_push_array(
906            &[ArrayDimension {
907                lower_bound: 1,
908                length: params.len(),
909            }],
910            params
911                .iter()
912                .map(|p| Datum::from(p.as_ref().map(String::as_str))),
913        )
914        .expect("correct array dimensions");
915    packer.push(Datum::from(mz_version.as_str()));
916    packer.push(Datum::TimestampTz(
917        to_datetime(*began_at).try_into().expect("Sane system time"),
918    ));
919}
920
921pub(crate) fn pack_statement_began_execution_update(record: &StatementBeganExecutionRecord) -> Row {
922    let mut row = Row::default();
923    let mut packer = row.packer();
924    pack_statement_execution_inner(record, &mut packer);
925    packer.extend([
926        // finished_at
927        Datum::Null,
928        // finished_status
929        Datum::Null,
930        // error_message
931        Datum::Null,
932        // result_size
933        Datum::Null,
934        // rows_returned
935        Datum::Null,
936        // execution_status
937        Datum::Null,
938    ]);
939    row
940}
941
942pub(crate) fn pack_statement_prepared_update(
943    record: &StatementPreparedRecord,
944    packer: &mut RowPacker,
945) {
946    let StatementPreparedRecord {
947        id,
948        session_id,
949        name,
950        sql_hash,
951        prepared_at,
952        kind,
953    } = record;
954    packer.extend([
955        Datum::Uuid(*id),
956        Datum::Uuid(*session_id),
957        Datum::String(name.as_str()),
958        Datum::Bytes(sql_hash.as_slice()),
959        Datum::TimestampTz(to_datetime(*prepared_at).try_into().expect("must fit")),
960        kind.map(statement_kind_label_value).into(),
961    ]);
962}
963
964/// Bundles all information needed to install watch sets for statement lifecycle logging.
965/// This includes the statement logging ID and the transitive dependencies to watch.
966#[derive(Debug)]
967pub struct WatchSetCreation {
968    /// The statement logging ID for this execution.
969    pub logging_id: StatementLoggingId,
970    /// The timestamp at which to watch for dependencies becoming ready.
971    pub timestamp: Timestamp,
972    /// Transitive storage dependencies (tables, sources) to watch.
973    pub storage_ids: BTreeSet<GlobalId>,
974    /// Transitive compute dependencies (materialized views, indexes) to watch.
975    pub compute_ids: BTreeSet<GlobalId>,
976}
977
978impl WatchSetCreation {
979    /// Compute transitive dependencies for watch sets from an input ID bundle, categorized into
980    /// storage and compute IDs.
981    pub fn new(
982        logging_id: StatementLoggingId,
983        catalog_state: &CatalogState,
984        input_id_bundle: &CollectionIdBundle,
985        timestamp: Timestamp,
986    ) -> Self {
987        let mut storage_ids = BTreeSet::new();
988        let mut compute_ids = BTreeSet::new();
989
990        for item_id in input_id_bundle
991            .iter()
992            .map(|gid| catalog_state.get_entry_by_global_id(&gid).id())
993            .flat_map(|id| catalog_state.transitive_uses(id))
994        {
995            let entry = catalog_state.get_entry(&item_id);
996            match entry.item() {
997                // TODO(alter_table): Adding all of the GlobalIds for an object is incorrect.
998                // For example, this peek may depend on just a single version of a table, but
999                // we would add dependencies on all versions of said table. Doing this is okay
1000                // for now since we can't yet version tables, but should get fixed.
1001                CatalogItem::Table(_) | CatalogItem::Source(_) => {
1002                    storage_ids.extend(entry.global_ids());
1003                }
1004                // Each catalog item is computed by at most one compute collection at a time,
1005                // which is also the most recent one.
1006                CatalogItem::MaterializedView(_) | CatalogItem::Index(_) => {
1007                    compute_ids.insert(entry.latest_global_id());
1008                }
1009                _ => {}
1010            }
1011        }
1012
1013        Self {
1014            logging_id,
1015            timestamp,
1016            storage_ids,
1017            compute_ids,
1018        }
1019    }
1020}