mz_adapter/
statement_logging.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use mz_controller_types::ClusterId;
11use mz_ore::cast::CastFrom;
12use mz_ore::now::EpochMillis;
13use mz_repr::{GlobalId, RowIterator};
14use mz_sql_parser::ast::StatementKind;
15use uuid::Uuid;
16
17use crate::session::TransactionId;
18use crate::{AdapterError, ExecuteResponse};
19
20#[derive(Clone, Debug)]
21pub enum StatementLifecycleEvent {
22    ExecutionBegan,
23    OptimizationFinished,
24    StorageDependenciesFinished,
25    ComputeDependenciesFinished,
26    ExecutionFinished,
27}
28
29impl StatementLifecycleEvent {
30    pub fn as_str(&self) -> &str {
31        match self {
32            Self::ExecutionBegan => "execution-began",
33            Self::OptimizationFinished => "optimization-finished",
34            Self::StorageDependenciesFinished => "storage-dependencies-finished",
35            Self::ComputeDependenciesFinished => "compute-dependencies-finished",
36            Self::ExecutionFinished => "execution-finished",
37        }
38    }
39}
40
41/// Contains all the information necessary to generate the initial
42/// entry in `mz_statement_execution_history`. We need to keep this
43/// around in order to modify the entry later once the statement finishes executing.
44#[derive(Clone, Debug)]
45pub struct StatementBeganExecutionRecord {
46    pub id: Uuid,
47    pub prepared_statement_id: Uuid,
48    pub sample_rate: f64,
49    pub params: Vec<Option<String>>,
50    pub began_at: EpochMillis,
51    pub cluster_id: Option<ClusterId>,
52    pub cluster_name: Option<String>,
53    pub database_name: String,
54    pub search_path: Vec<String>,
55    pub application_name: String,
56    pub transaction_isolation: String,
57    pub execution_timestamp: Option<EpochMillis>,
58    pub transaction_id: TransactionId,
59    pub transient_index_id: Option<GlobalId>,
60    pub mz_version: String,
61}
62
63#[derive(Clone, Copy, Debug)]
64pub enum StatementExecutionStrategy {
65    /// The statement was executed by spinning up a dataflow.
66    Standard,
67    /// The statement was executed by reading from an existing
68    /// arrangement.
69    FastPath,
70    /// Experimental: The statement was executed by reading from an existing
71    /// persist collection.
72    PersistFastPath,
73    /// The statement was determined to be constant by
74    /// environmentd, and not sent to a cluster.
75    Constant,
76}
77
78impl StatementExecutionStrategy {
79    pub fn name(&self) -> &'static str {
80        match self {
81            Self::Standard => "standard",
82            Self::FastPath => "fast-path",
83            Self::PersistFastPath => "persist-fast-path",
84            Self::Constant => "constant",
85        }
86    }
87}
88
89#[derive(Clone, Debug)]
90pub enum StatementEndedExecutionReason {
91    Success {
92        result_size: Option<u64>,
93        rows_returned: Option<u64>,
94        execution_strategy: Option<StatementExecutionStrategy>,
95    },
96    Canceled,
97    Errored {
98        error: String,
99    },
100    Aborted,
101}
102
103#[derive(Clone, Debug)]
104pub struct StatementEndedExecutionRecord {
105    pub id: Uuid,
106    pub reason: StatementEndedExecutionReason,
107    pub ended_at: EpochMillis,
108}
109
110/// Contains all the information necessary to generate an entry in
111/// `mz_prepared_statement_history`
112#[derive(Clone, Debug)]
113pub struct StatementPreparedRecord {
114    pub id: Uuid,
115    pub sql_hash: [u8; 32],
116    pub name: String,
117    pub session_id: Uuid,
118    pub prepared_at: EpochMillis,
119    pub kind: Option<StatementKind>,
120}
121
122#[derive(Clone, Debug)]
123pub enum StatementLoggingEvent {
124    Prepared(StatementPreparedRecord),
125    BeganExecution(StatementBeganExecutionRecord),
126    EndedExecution(StatementEndedExecutionRecord),
127    BeganSession(SessionHistoryEvent),
128}
129
130#[derive(Clone, Debug)]
131pub struct SessionHistoryEvent {
132    pub id: Uuid,
133    pub connected_at: EpochMillis,
134    pub application_name: String,
135    pub authenticated_user: String,
136}
137
138impl From<&Result<ExecuteResponse, AdapterError>> for StatementEndedExecutionReason {
139    fn from(value: &Result<ExecuteResponse, AdapterError>) -> StatementEndedExecutionReason {
140        match value {
141            Ok(resp) => resp.into(),
142            Err(e) => StatementEndedExecutionReason::Errored {
143                error: e.to_string(),
144            },
145        }
146    }
147}
148
149impl From<&ExecuteResponse> for StatementEndedExecutionReason {
150    fn from(value: &ExecuteResponse) -> StatementEndedExecutionReason {
151        match value {
152            ExecuteResponse::CopyTo { resp, .. } => match resp.as_ref() {
153                // NB [btv]: It's not clear that this combination
154                // can ever actually happen.
155                ExecuteResponse::SendingRowsImmediate { rows, .. } => {
156                    // Note(parkmycar): It potentially feels bad here to iterate over the entire
157                    // iterator _just_ to get the encoded result size. As noted above, it's not
158                    // entirely clear this case ever happens, so the simplicity is worth it.
159                    let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
160                    StatementEndedExecutionReason::Success {
161                        result_size: Some(u64::cast_from(result_size)),
162                        rows_returned: Some(u64::cast_from(rows.count())),
163                        execution_strategy: Some(StatementExecutionStrategy::Constant),
164                    }
165                }
166                ExecuteResponse::SendingRows { .. } => {
167                    panic!("SELECTs terminate on peek finalization, not here.")
168                }
169                ExecuteResponse::Subscribing { .. } => {
170                    panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
171                }
172                _ => panic!("Invalid COPY response type"),
173            },
174            ExecuteResponse::CopyFrom { .. } => {
175                panic!("COPY FROMs terminate in the protocol layer, not here.")
176            }
177            ExecuteResponse::Fetch { .. } => {
178                panic!("FETCHes terminate after a follow-up message is sent.")
179            }
180            ExecuteResponse::SendingRows { .. } => {
181                panic!("SELECTs terminate on peek finalization, not here.")
182            }
183            ExecuteResponse::Subscribing { .. } => {
184                panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
185            }
186
187            ExecuteResponse::SendingRowsImmediate { rows, .. } => {
188                // Note(parkmycar): It potentially feels bad here to iterate over the entire
189                // iterator _just_ to get the encoded result size, the number of Rows returned here
190                // shouldn't be too large though. An alternative is to pre-compute some of the
191                // result size, but that would require always decoding Rows to handle projecting
192                // away columns, which has a negative impact for much larger response sizes.
193                let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
194                StatementEndedExecutionReason::Success {
195                    result_size: Some(u64::cast_from(result_size)),
196                    rows_returned: Some(u64::cast_from(rows.count())),
197                    execution_strategy: Some(StatementExecutionStrategy::Constant),
198                }
199            }
200
201            ExecuteResponse::AlteredDefaultPrivileges
202            | ExecuteResponse::AlteredObject(_)
203            | ExecuteResponse::AlteredRole
204            | ExecuteResponse::AlteredSystemConfiguration
205            | ExecuteResponse::ClosedCursor
206            | ExecuteResponse::Comment
207            | ExecuteResponse::Copied(_)
208            | ExecuteResponse::CreatedConnection
209            | ExecuteResponse::CreatedDatabase
210            | ExecuteResponse::CreatedSchema
211            | ExecuteResponse::CreatedRole
212            | ExecuteResponse::CreatedCluster
213            | ExecuteResponse::CreatedClusterReplica
214            | ExecuteResponse::CreatedIndex
215            | ExecuteResponse::CreatedIntrospectionSubscribe
216            | ExecuteResponse::CreatedSecret
217            | ExecuteResponse::CreatedSink
218            | ExecuteResponse::CreatedSource
219            | ExecuteResponse::CreatedTable
220            | ExecuteResponse::CreatedView
221            | ExecuteResponse::CreatedViews
222            | ExecuteResponse::CreatedMaterializedView
223            | ExecuteResponse::CreatedContinualTask
224            | ExecuteResponse::CreatedType
225            | ExecuteResponse::CreatedNetworkPolicy
226            | ExecuteResponse::Deallocate { .. }
227            | ExecuteResponse::DeclaredCursor
228            | ExecuteResponse::Deleted(_)
229            | ExecuteResponse::DiscardedTemp
230            | ExecuteResponse::DiscardedAll
231            | ExecuteResponse::DroppedObject(_)
232            | ExecuteResponse::DroppedOwned
233            | ExecuteResponse::EmptyQuery
234            | ExecuteResponse::GrantedPrivilege
235            | ExecuteResponse::GrantedRole
236            | ExecuteResponse::Inserted(_)
237            | ExecuteResponse::Prepare
238            | ExecuteResponse::Raised
239            | ExecuteResponse::ReassignOwned
240            | ExecuteResponse::RevokedPrivilege
241            | ExecuteResponse::RevokedRole
242            | ExecuteResponse::SetVariable { .. }
243            | ExecuteResponse::StartedTransaction
244            | ExecuteResponse::TransactionCommitted { .. }
245            | ExecuteResponse::TransactionRolledBack { .. }
246            | ExecuteResponse::Updated(_)
247            | ExecuteResponse::ValidatedConnection { .. } => {
248                StatementEndedExecutionReason::Success {
249                    result_size: None,
250                    rows_returned: None,
251                    execution_strategy: None,
252                }
253            }
254        }
255    }
256}