1use mz_controller_types::ClusterId;
11use mz_ore::cast::CastFrom;
12use mz_ore::now::EpochMillis;
13use mz_repr::{GlobalId, RowIterator};
14use mz_sql_parser::ast::StatementKind;
15use uuid::Uuid;
16
17use crate::session::TransactionId;
18use crate::{AdapterError, ExecuteResponse};
19
20#[derive(Clone, Debug)]
21pub enum StatementLifecycleEvent {
22 ExecutionBegan,
23 OptimizationFinished,
24 StorageDependenciesFinished,
25 ComputeDependenciesFinished,
26 ExecutionFinished,
27}
28
29impl StatementLifecycleEvent {
30 pub fn as_str(&self) -> &str {
31 match self {
32 Self::ExecutionBegan => "execution-began",
33 Self::OptimizationFinished => "optimization-finished",
34 Self::StorageDependenciesFinished => "storage-dependencies-finished",
35 Self::ComputeDependenciesFinished => "compute-dependencies-finished",
36 Self::ExecutionFinished => "execution-finished",
37 }
38 }
39}
40
41#[derive(Clone, Debug)]
45pub struct StatementBeganExecutionRecord {
46 pub id: Uuid,
47 pub prepared_statement_id: Uuid,
48 pub sample_rate: f64,
49 pub params: Vec<Option<String>>,
50 pub began_at: EpochMillis,
51 pub cluster_id: Option<ClusterId>,
52 pub cluster_name: Option<String>,
53 pub database_name: String,
54 pub search_path: Vec<String>,
55 pub application_name: String,
56 pub transaction_isolation: String,
57 pub execution_timestamp: Option<EpochMillis>,
58 pub transaction_id: TransactionId,
59 pub transient_index_id: Option<GlobalId>,
60 pub mz_version: String,
61}
62
63#[derive(Clone, Copy, Debug)]
64pub enum StatementExecutionStrategy {
65 Standard,
67 FastPath,
70 PersistFastPath,
73 Constant,
76}
77
78impl StatementExecutionStrategy {
79 pub fn name(&self) -> &'static str {
80 match self {
81 Self::Standard => "standard",
82 Self::FastPath => "fast-path",
83 Self::PersistFastPath => "persist-fast-path",
84 Self::Constant => "constant",
85 }
86 }
87}
88
89#[derive(Clone, Debug)]
90pub enum StatementEndedExecutionReason {
91 Success {
92 result_size: Option<u64>,
93 rows_returned: Option<u64>,
94 execution_strategy: Option<StatementExecutionStrategy>,
95 },
96 Canceled,
97 Errored {
98 error: String,
99 },
100 Aborted,
101}
102
103#[derive(Clone, Debug)]
104pub struct StatementEndedExecutionRecord {
105 pub id: Uuid,
106 pub reason: StatementEndedExecutionReason,
107 pub ended_at: EpochMillis,
108}
109
110#[derive(Clone, Debug)]
113pub struct StatementPreparedRecord {
114 pub id: Uuid,
115 pub sql_hash: [u8; 32],
116 pub name: String,
117 pub session_id: Uuid,
118 pub prepared_at: EpochMillis,
119 pub kind: Option<StatementKind>,
120}
121
122#[derive(Clone, Debug)]
123pub enum StatementLoggingEvent {
124 Prepared(StatementPreparedRecord),
125 BeganExecution(StatementBeganExecutionRecord),
126 EndedExecution(StatementEndedExecutionRecord),
127 BeganSession(SessionHistoryEvent),
128}
129
130#[derive(Clone, Debug)]
131pub struct SessionHistoryEvent {
132 pub id: Uuid,
133 pub connected_at: EpochMillis,
134 pub application_name: String,
135 pub authenticated_user: String,
136}
137
138impl From<&Result<ExecuteResponse, AdapterError>> for StatementEndedExecutionReason {
139 fn from(value: &Result<ExecuteResponse, AdapterError>) -> StatementEndedExecutionReason {
140 match value {
141 Ok(resp) => resp.into(),
142 Err(e) => StatementEndedExecutionReason::Errored {
143 error: e.to_string(),
144 },
145 }
146 }
147}
148
149impl From<&ExecuteResponse> for StatementEndedExecutionReason {
150 fn from(value: &ExecuteResponse) -> StatementEndedExecutionReason {
151 match value {
152 ExecuteResponse::CopyTo { resp, .. } => match resp.as_ref() {
153 ExecuteResponse::SendingRowsImmediate { rows, .. } => {
156 let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
160 StatementEndedExecutionReason::Success {
161 result_size: Some(u64::cast_from(result_size)),
162 rows_returned: Some(u64::cast_from(rows.count())),
163 execution_strategy: Some(StatementExecutionStrategy::Constant),
164 }
165 }
166 ExecuteResponse::SendingRows { .. } => {
167 panic!("SELECTs terminate on peek finalization, not here.")
168 }
169 ExecuteResponse::Subscribing { .. } => {
170 panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
171 }
172 _ => panic!("Invalid COPY response type"),
173 },
174 ExecuteResponse::CopyFrom { .. } => {
175 panic!("COPY FROMs terminate in the protocol layer, not here.")
176 }
177 ExecuteResponse::Fetch { .. } => {
178 panic!("FETCHes terminate after a follow-up message is sent.")
179 }
180 ExecuteResponse::SendingRows { .. } => {
181 panic!("SELECTs terminate on peek finalization, not here.")
182 }
183 ExecuteResponse::Subscribing { .. } => {
184 panic!("SUBSCRIBEs terminate in the protocol layer, not here.")
185 }
186
187 ExecuteResponse::SendingRowsImmediate { rows, .. } => {
188 let result_size: usize = rows.box_clone().map(|row| row.byte_len()).sum();
194 StatementEndedExecutionReason::Success {
195 result_size: Some(u64::cast_from(result_size)),
196 rows_returned: Some(u64::cast_from(rows.count())),
197 execution_strategy: Some(StatementExecutionStrategy::Constant),
198 }
199 }
200
201 ExecuteResponse::AlteredDefaultPrivileges
202 | ExecuteResponse::AlteredObject(_)
203 | ExecuteResponse::AlteredRole
204 | ExecuteResponse::AlteredSystemConfiguration
205 | ExecuteResponse::ClosedCursor
206 | ExecuteResponse::Comment
207 | ExecuteResponse::Copied(_)
208 | ExecuteResponse::CreatedConnection
209 | ExecuteResponse::CreatedDatabase
210 | ExecuteResponse::CreatedSchema
211 | ExecuteResponse::CreatedRole
212 | ExecuteResponse::CreatedCluster
213 | ExecuteResponse::CreatedClusterReplica
214 | ExecuteResponse::CreatedIndex
215 | ExecuteResponse::CreatedIntrospectionSubscribe
216 | ExecuteResponse::CreatedSecret
217 | ExecuteResponse::CreatedSink
218 | ExecuteResponse::CreatedSource
219 | ExecuteResponse::CreatedTable
220 | ExecuteResponse::CreatedView
221 | ExecuteResponse::CreatedViews
222 | ExecuteResponse::CreatedMaterializedView
223 | ExecuteResponse::CreatedContinualTask
224 | ExecuteResponse::CreatedType
225 | ExecuteResponse::CreatedNetworkPolicy
226 | ExecuteResponse::Deallocate { .. }
227 | ExecuteResponse::DeclaredCursor
228 | ExecuteResponse::Deleted(_)
229 | ExecuteResponse::DiscardedTemp
230 | ExecuteResponse::DiscardedAll
231 | ExecuteResponse::DroppedObject(_)
232 | ExecuteResponse::DroppedOwned
233 | ExecuteResponse::EmptyQuery
234 | ExecuteResponse::GrantedPrivilege
235 | ExecuteResponse::GrantedRole
236 | ExecuteResponse::Inserted(_)
237 | ExecuteResponse::Prepare
238 | ExecuteResponse::Raised
239 | ExecuteResponse::ReassignOwned
240 | ExecuteResponse::RevokedPrivilege
241 | ExecuteResponse::RevokedRole
242 | ExecuteResponse::SetVariable { .. }
243 | ExecuteResponse::StartedTransaction
244 | ExecuteResponse::TransactionCommitted { .. }
245 | ExecuteResponse::TransactionRolledBack { .. }
246 | ExecuteResponse::Updated(_)
247 | ExecuteResponse::ValidatedConnection { .. } => {
248 StatementEndedExecutionReason::Success {
249 result_size: None,
250 rows_returned: None,
251 execution_strategy: None,
252 }
253 }
254 }
255 }
256}