mz_audit_log/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Audit log data structures.
11//!
12//! The audit log is logging that is produced by user actions and consumed
13//! by users in the form of the `mz_catalog.mz_audit_events` SQL table and
14//! by the cloud management layer for billing and introspection. This crate
15//! is designed to make the production and consumption of the logs type
16//! safe. Events and their metadata are versioned and the data structures
17//! replicated here so that if the data change in some other crate, a
18//! new version here can be made. This avoids needing to poke at the data
19//! when reading it to determine what it means and should have full backward
20//! compatibility. This is its own crate so that production and consumption can
21//! be in different processes and production is not allowed to specify private
22//! data structures unknown to the reader.
23
24use mz_ore::now::EpochMillis;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28/// New version variants should be added if fields need to be added, changed, or removed.
29#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
30pub enum VersionedEvent {
31    V1(EventV1),
32}
33
34impl VersionedEvent {
35    /// Create a new event. This function must always require and produce the most
36    /// recent variant of VersionedEvent. `id` must be a globally increasing,
37    /// ordered number such that sorting by it on all events yields the order
38    /// of events by users. It is insufficient to use `occurred_at` (even at
39    /// nanosecond precision) due to clock unpredictability.
40    pub fn new(
41        id: u64,
42        event_type: EventType,
43        object_type: ObjectType,
44        details: EventDetails,
45        user: Option<String>,
46        occurred_at: EpochMillis,
47    ) -> Self {
48        Self::V1(EventV1::new(
49            id,
50            event_type,
51            object_type,
52            details,
53            user,
54            occurred_at,
55        ))
56    }
57
58    // Implement deserialize and serialize so writers and readers don't have to
59    // coordinate about which Serializer to use.
60    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
61        Ok(serde_json::from_slice(data)?)
62    }
63
64    pub fn serialize(&self) -> Vec<u8> {
65        serde_json::to_vec(self).expect("must serialize")
66    }
67
68    /// Returns a globally sortable event order. All event versions must have this
69    /// field.
70    pub fn sortable_id(&self) -> u64 {
71        match self {
72            VersionedEvent::V1(ev) => ev.id,
73        }
74    }
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
78#[serde(rename_all = "kebab-case")]
79pub enum EventType {
80    Create,
81    Drop,
82    Alter,
83    Grant,
84    Revoke,
85    Comment,
86}
87
88impl EventType {
89    pub fn as_title_case(&self) -> &'static str {
90        match self {
91            EventType::Create => "Created",
92            EventType::Drop => "Dropped",
93            EventType::Alter => "Altered",
94            EventType::Grant => "Granted",
95            EventType::Revoke => "Revoked",
96            EventType::Comment => "Comment",
97        }
98    }
99}
100
101serde_plain::derive_display_from_serialize!(EventType);
102
103#[derive(
104    Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary,
105)]
106#[serde(rename_all = "kebab-case")]
107pub enum ObjectType {
108    Cluster,
109    ClusterReplica,
110    Connection,
111    ContinualTask,
112    Database,
113    Func,
114    Index,
115    MaterializedView,
116    NetworkPolicy,
117    Role,
118    Secret,
119    Schema,
120    Sink,
121    Source,
122    System,
123    Table,
124    Type,
125    View,
126}
127
128impl ObjectType {
129    pub fn as_title_case(&self) -> &'static str {
130        match self {
131            ObjectType::Cluster => "Cluster",
132            ObjectType::ClusterReplica => "Cluster Replica",
133            ObjectType::Connection => "Connection",
134            ObjectType::ContinualTask => "Continual Task",
135            ObjectType::Database => "Database",
136            ObjectType::Func => "Function",
137            ObjectType::Index => "Index",
138            ObjectType::MaterializedView => "Materialized View",
139            ObjectType::NetworkPolicy => "Network Policy",
140            ObjectType::Role => "Role",
141            ObjectType::Schema => "Schema",
142            ObjectType::Secret => "Secret",
143            ObjectType::Sink => "Sink",
144            ObjectType::Source => "Source",
145            ObjectType::System => "System",
146            ObjectType::Table => "Table",
147            ObjectType::Type => "Type",
148            ObjectType::View => "View",
149        }
150    }
151}
152
153serde_plain::derive_display_from_serialize!(ObjectType);
154
155#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
156pub enum EventDetails {
157    #[serde(rename = "CreateComputeReplicaV1")] // historical name
158    CreateClusterReplicaV1(CreateClusterReplicaV1),
159    CreateClusterReplicaV2(CreateClusterReplicaV2),
160    CreateClusterReplicaV3(CreateClusterReplicaV3),
161    CreateClusterReplicaV4(CreateClusterReplicaV4),
162    #[serde(rename = "DropComputeReplicaV1")] // historical name
163    DropClusterReplicaV1(DropClusterReplicaV1),
164    DropClusterReplicaV2(DropClusterReplicaV2),
165    DropClusterReplicaV3(DropClusterReplicaV3),
166    CreateSourceSinkV1(CreateSourceSinkV1),
167    CreateSourceSinkV2(CreateSourceSinkV2),
168    CreateSourceSinkV3(CreateSourceSinkV3),
169    CreateSourceSinkV4(CreateSourceSinkV4),
170    CreateIndexV1(CreateIndexV1),
171    CreateMaterializedViewV1(CreateMaterializedViewV1),
172    AlterApplyReplacementV1(AlterApplyReplacementV1),
173    AlterSetClusterV1(AlterSetClusterV1),
174    AlterSourceSinkV1(AlterSourceSinkV1),
175    GrantRoleV1(GrantRoleV1),
176    GrantRoleV2(GrantRoleV2),
177    RevokeRoleV1(RevokeRoleV1),
178    RevokeRoleV2(RevokeRoleV2),
179    UpdatePrivilegeV1(UpdatePrivilegeV1),
180    AlterDefaultPrivilegeV1(AlterDefaultPrivilegeV1),
181    UpdateOwnerV1(UpdateOwnerV1),
182    IdFullNameV1(IdFullNameV1),
183    RenameClusterV1(RenameClusterV1),
184    RenameClusterReplicaV1(RenameClusterReplicaV1),
185    RenameItemV1(RenameItemV1),
186    IdNameV1(IdNameV1),
187    SchemaV1(SchemaV1),
188    SchemaV2(SchemaV2),
189    UpdateItemV1(UpdateItemV1),
190    RenameSchemaV1(RenameSchemaV1),
191    AlterRetainHistoryV1(AlterRetainHistoryV1),
192    ToNewIdV1(ToNewIdV1),
193    FromPreviousIdV1(FromPreviousIdV1),
194    SetV1(SetV1),
195    ResetAllV1,
196    RotateKeysV1(RotateKeysV1),
197}
198
199#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
200pub struct SetV1 {
201    pub name: String,
202    pub value: Option<String>,
203}
204
205#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
206pub struct RotateKeysV1 {
207    pub id: String,
208    pub name: String,
209}
210
211#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
212pub struct IdFullNameV1 {
213    pub id: String,
214    #[serde(flatten)]
215    pub name: FullNameV1,
216}
217
218#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
219pub struct FullNameV1 {
220    pub database: String,
221    pub schema: String,
222    pub item: String,
223}
224
225#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
226pub struct IdNameV1 {
227    pub id: String,
228    pub name: String,
229}
230
231#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
232pub struct RenameItemV1 {
233    pub id: String,
234    pub old_name: FullNameV1,
235    pub new_name: FullNameV1,
236}
237
238#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
239pub struct RenameClusterV1 {
240    pub id: String,
241    pub old_name: String,
242    pub new_name: String,
243}
244
245#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
246pub struct RenameClusterReplicaV1 {
247    pub cluster_id: String,
248    pub replica_id: String,
249    pub old_name: String,
250    pub new_name: String,
251}
252
253#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
254pub struct DropClusterReplicaV1 {
255    pub cluster_id: String,
256    pub cluster_name: String,
257    // Events that predate v0.32.0 will not have this field set.
258    #[serde(skip_serializing_if = "Option::is_none")]
259    pub replica_id: Option<String>,
260    pub replica_name: String,
261}
262
263#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
264pub struct DropClusterReplicaV2 {
265    pub cluster_id: String,
266    pub cluster_name: String,
267    pub replica_id: Option<String>,
268    pub replica_name: String,
269    pub reason: CreateOrDropClusterReplicaReasonV1,
270    #[serde(skip_serializing_if = "Option::is_none")]
271    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
272}
273
274#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
275pub struct DropClusterReplicaV3 {
276    pub cluster_id: String,
277    pub cluster_name: String,
278    pub replica_id: Option<String>,
279    pub replica_name: String,
280    pub reason: CreateOrDropClusterReplicaReasonV1,
281    #[serde(skip_serializing_if = "Option::is_none")]
282    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
283}
284
285#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
286pub struct CreateClusterReplicaV1 {
287    pub cluster_id: String,
288    pub cluster_name: String,
289    // Events that predate v0.32.0 will not have this field set.
290    #[serde(skip_serializing_if = "Option::is_none")]
291    pub replica_id: Option<String>,
292    pub replica_name: String,
293    pub logical_size: String,
294    pub disk: bool,
295    pub billed_as: Option<String>,
296    pub internal: bool,
297}
298
299#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
300pub struct CreateClusterReplicaV2 {
301    pub cluster_id: String,
302    pub cluster_name: String,
303    pub replica_id: Option<String>,
304    pub replica_name: String,
305    pub logical_size: String,
306    pub disk: bool,
307    pub billed_as: Option<String>,
308    pub internal: bool,
309    pub reason: CreateOrDropClusterReplicaReasonV1,
310    #[serde(skip_serializing_if = "Option::is_none")]
311    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
312}
313
314#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
315pub struct CreateClusterReplicaV3 {
316    pub cluster_id: String,
317    pub cluster_name: String,
318    pub replica_id: Option<String>,
319    pub replica_name: String,
320    pub logical_size: String,
321    pub disk: bool,
322    pub billed_as: Option<String>,
323    pub internal: bool,
324    pub reason: CreateOrDropClusterReplicaReasonV1,
325    #[serde(skip_serializing_if = "Option::is_none")]
326    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
327}
328
329#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
330pub struct CreateClusterReplicaV4 {
331    pub cluster_id: String,
332    pub cluster_name: String,
333    pub replica_id: Option<String>,
334    pub replica_name: String,
335    pub logical_size: String,
336    pub billed_as: Option<String>,
337    pub internal: bool,
338    pub reason: CreateOrDropClusterReplicaReasonV1,
339    #[serde(skip_serializing_if = "Option::is_none")]
340    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
341}
342
343#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
344#[serde(rename_all = "kebab-case")]
345pub enum CreateOrDropClusterReplicaReasonV1 {
346    Manual,
347    Schedule,
348    System,
349}
350
351/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
352/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
353/// can be settings of the policy as well as other information about the state of the system.)
354#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
355pub struct SchedulingDecisionsWithReasonsV1 {
356    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
357    pub on_refresh: RefreshDecisionWithReasonV1,
358}
359
360/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
361/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
362/// can be settings of the policy as well as other information about the state of the system.)
363#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
364pub struct SchedulingDecisionsWithReasonsV2 {
365    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
366    pub on_refresh: RefreshDecisionWithReasonV2,
367}
368
369#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
370pub struct RefreshDecisionWithReasonV1 {
371    pub decision: SchedulingDecisionV1,
372    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
373    /// time estimate).
374    pub objects_needing_refresh: Vec<String>,
375    /// The HYDRATION TIME ESTIMATE setting of the cluster.
376    pub hydration_time_estimate: String,
377}
378
379#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
380pub struct RefreshDecisionWithReasonV2 {
381    pub decision: SchedulingDecisionV1,
382    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
383    /// time estimate), and therefore should keep the cluster On.
384    pub objects_needing_refresh: Vec<String>,
385    /// Objects for which we estimate that they currently need Persist compaction, and therefore
386    /// should keep the cluster On.
387    pub objects_needing_compaction: Vec<String>,
388    /// The HYDRATION TIME ESTIMATE setting of the cluster.
389    pub hydration_time_estimate: String,
390}
391
392#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
393#[serde(rename_all = "kebab-case")]
394pub enum SchedulingDecisionV1 {
395    On,
396    Off,
397}
398
399impl From<bool> for SchedulingDecisionV1 {
400    fn from(value: bool) -> Self {
401        match value {
402            true => SchedulingDecisionV1::On,
403            false => SchedulingDecisionV1::Off,
404        }
405    }
406}
407
408#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
409pub struct CreateSourceSinkV1 {
410    pub id: String,
411    #[serde(flatten)]
412    pub name: FullNameV1,
413    pub size: Option<String>,
414}
415
416#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
417pub struct CreateSourceSinkV2 {
418    pub id: String,
419    #[serde(flatten)]
420    pub name: FullNameV1,
421    pub size: Option<String>,
422    #[serde(rename = "type")]
423    pub external_type: String,
424}
425
426#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
427pub struct CreateSourceSinkV3 {
428    pub id: String,
429    #[serde(flatten)]
430    pub name: FullNameV1,
431    #[serde(rename = "type")]
432    pub external_type: String,
433}
434
435#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
436pub struct CreateSourceSinkV4 {
437    pub id: String,
438    pub cluster_id: Option<String>,
439    #[serde(flatten)]
440    pub name: FullNameV1,
441    #[serde(rename = "type")]
442    pub external_type: String,
443}
444
445#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
446pub struct CreateIndexV1 {
447    pub id: String,
448    pub cluster_id: String,
449    #[serde(flatten)]
450    pub name: FullNameV1,
451}
452
453#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
454pub struct CreateMaterializedViewV1 {
455    pub id: String,
456    pub cluster_id: String,
457    #[serde(flatten)]
458    pub name: FullNameV1,
459    #[serde(skip_serializing_if = "Option::is_none")]
460    pub replacement_target_id: Option<String>,
461}
462
463#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
464pub struct AlterApplyReplacementV1 {
465    #[serde(flatten)]
466    pub target: IdFullNameV1,
467    pub replacement: IdFullNameV1,
468}
469
470#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
471pub struct AlterSourceSinkV1 {
472    pub id: String,
473    #[serde(flatten)]
474    pub name: FullNameV1,
475    pub old_size: Option<String>,
476    pub new_size: Option<String>,
477}
478
479#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
480pub struct AlterSetClusterV1 {
481    pub id: String,
482    #[serde(flatten)]
483    pub name: FullNameV1,
484    pub old_cluster_id: String,
485    pub new_cluster_id: String,
486}
487
488#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
489pub struct GrantRoleV1 {
490    pub role_id: String,
491    pub member_id: String,
492    pub grantor_id: String,
493}
494
495#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
496pub struct GrantRoleV2 {
497    pub role_id: String,
498    pub member_id: String,
499    pub grantor_id: String,
500    pub executed_by: String,
501}
502
503#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
504pub struct RevokeRoleV1 {
505    pub role_id: String,
506    pub member_id: String,
507}
508
509#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
510pub struct RevokeRoleV2 {
511    pub role_id: String,
512    pub member_id: String,
513    pub grantor_id: String,
514    pub executed_by: String,
515}
516
517#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
518pub struct UpdatePrivilegeV1 {
519    pub object_id: String,
520    pub grantee_id: String,
521    pub grantor_id: String,
522    pub privileges: String,
523}
524
525#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
526pub struct AlterDefaultPrivilegeV1 {
527    pub role_id: String,
528    pub database_id: Option<String>,
529    pub schema_id: Option<String>,
530    pub grantee_id: String,
531    pub privileges: String,
532}
533
534#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
535pub struct UpdateOwnerV1 {
536    pub object_id: String,
537    pub old_owner_id: String,
538    pub new_owner_id: String,
539}
540
541#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
542pub struct SchemaV1 {
543    pub id: String,
544    pub name: String,
545    pub database_name: String,
546}
547
548#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
549pub struct SchemaV2 {
550    pub id: String,
551    pub name: String,
552    pub database_name: Option<String>,
553}
554
555#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
556pub struct RenameSchemaV1 {
557    pub id: String,
558    pub database_name: Option<String>,
559    pub old_name: String,
560    pub new_name: String,
561}
562
563#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
564pub struct AlterRetainHistoryV1 {
565    pub id: String,
566    pub old_history: Option<String>,
567    pub new_history: Option<String>,
568}
569
570#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
571pub struct UpdateItemV1 {
572    pub id: String,
573    #[serde(flatten)]
574    pub name: FullNameV1,
575}
576
577#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
578pub struct ToNewIdV1 {
579    pub id: String,
580    pub new_id: String,
581}
582
583#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
584pub struct FromPreviousIdV1 {
585    pub id: String,
586    pub previous_id: String,
587}
588
589impl EventDetails {
590    pub fn as_json(&self) -> serde_json::Value {
591        match self {
592            EventDetails::CreateClusterReplicaV1(v) => {
593                serde_json::to_value(v).expect("must serialize")
594            }
595            EventDetails::CreateClusterReplicaV2(v) => {
596                serde_json::to_value(v).expect("must serialize")
597            }
598            EventDetails::CreateClusterReplicaV3(v) => {
599                serde_json::to_value(v).expect("must serialize")
600            }
601            EventDetails::CreateClusterReplicaV4(v) => {
602                serde_json::to_value(v).expect("must serialize")
603            }
604            EventDetails::DropClusterReplicaV1(v) => {
605                serde_json::to_value(v).expect("must serialize")
606            }
607            EventDetails::DropClusterReplicaV2(v) => {
608                serde_json::to_value(v).expect("must serialize")
609            }
610            EventDetails::DropClusterReplicaV3(v) => {
611                serde_json::to_value(v).expect("must serialize")
612            }
613            EventDetails::IdFullNameV1(v) => serde_json::to_value(v).expect("must serialize"),
614            EventDetails::RenameClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
615            EventDetails::RenameClusterReplicaV1(v) => {
616                serde_json::to_value(v).expect("must serialize")
617            }
618            EventDetails::RenameItemV1(v) => serde_json::to_value(v).expect("must serialize"),
619            EventDetails::IdNameV1(v) => serde_json::to_value(v).expect("must serialize"),
620            EventDetails::SchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
621            EventDetails::SchemaV2(v) => serde_json::to_value(v).expect("must serialize"),
622            EventDetails::RenameSchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
623            EventDetails::CreateSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
624            EventDetails::CreateSourceSinkV2(v) => serde_json::to_value(v).expect("must serialize"),
625            EventDetails::CreateSourceSinkV3(v) => serde_json::to_value(v).expect("must serialize"),
626            EventDetails::CreateSourceSinkV4(v) => serde_json::to_value(v).expect("must serialize"),
627            EventDetails::CreateIndexV1(v) => serde_json::to_value(v).expect("must serialize"),
628            EventDetails::CreateMaterializedViewV1(v) => {
629                serde_json::to_value(v).expect("must serialize")
630            }
631            EventDetails::AlterApplyReplacementV1(v) => {
632                serde_json::to_value(v).expect("must serialize")
633            }
634            EventDetails::AlterSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
635            EventDetails::AlterSetClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
636            EventDetails::GrantRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
637            EventDetails::GrantRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
638            EventDetails::RevokeRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
639            EventDetails::RevokeRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
640            EventDetails::UpdatePrivilegeV1(v) => serde_json::to_value(v).expect("must serialize"),
641            EventDetails::AlterDefaultPrivilegeV1(v) => {
642                serde_json::to_value(v).expect("must serialize")
643            }
644            EventDetails::UpdateOwnerV1(v) => serde_json::to_value(v).expect("must serialize"),
645            EventDetails::UpdateItemV1(v) => serde_json::to_value(v).expect("must serialize"),
646            EventDetails::AlterRetainHistoryV1(v) => {
647                serde_json::to_value(v).expect("must serialize")
648            }
649            EventDetails::ToNewIdV1(v) => serde_json::to_value(v).expect("must serialize"),
650            EventDetails::FromPreviousIdV1(v) => serde_json::to_value(v).expect("must serialize"),
651            EventDetails::SetV1(v) => serde_json::to_value(v).expect("must serialize"),
652            EventDetails::ResetAllV1 => serde_json::Value::Null,
653            EventDetails::RotateKeysV1(v) => serde_json::to_value(v).expect("must serialize"),
654        }
655    }
656}
657
658#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
659pub struct EventV1 {
660    pub id: u64,
661    pub event_type: EventType,
662    pub object_type: ObjectType,
663    pub details: EventDetails,
664    pub user: Option<String>,
665    pub occurred_at: EpochMillis,
666}
667
668impl EventV1 {
669    fn new(
670        id: u64,
671        event_type: EventType,
672        object_type: ObjectType,
673        details: EventDetails,
674        user: Option<String>,
675        occurred_at: EpochMillis,
676    ) -> EventV1 {
677        EventV1 {
678            id,
679            event_type,
680            object_type,
681            details,
682            user,
683            occurred_at,
684        }
685    }
686}
687
688#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
689pub struct StorageUsageV1 {
690    pub id: u64,
691    pub shard_id: Option<String>,
692    pub size_bytes: u64,
693    pub collection_timestamp: EpochMillis,
694}
695
696impl StorageUsageV1 {
697    pub fn new(
698        id: u64,
699        shard_id: Option<String>,
700        size_bytes: u64,
701        collection_timestamp: EpochMillis,
702    ) -> StorageUsageV1 {
703        StorageUsageV1 {
704            id,
705            shard_id,
706            size_bytes,
707            collection_timestamp,
708        }
709    }
710}
711
712/// Describes the environment's storage usage at a point in time.
713///
714/// This type is persisted in the catalog across restarts, so any updates to the
715/// schema will require a new version.
716#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
717pub enum VersionedStorageUsage {
718    V1(StorageUsageV1),
719}
720
721impl VersionedStorageUsage {
722    /// Create a new metric snapshot.
723    /// This function must always require and produce the most
724    /// recent variant of VersionedStorageMetrics.
725    pub fn new(
726        id: u64,
727        object_id: Option<String>,
728        size_bytes: u64,
729        collection_timestamp: EpochMillis,
730    ) -> Self {
731        Self::V1(StorageUsageV1::new(
732            id,
733            object_id,
734            size_bytes,
735            collection_timestamp,
736        ))
737    }
738
739    // Implement deserialize and serialize so writers and readers don't have to
740    // coordinate about which Serializer to use.
741    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
742        Ok(serde_json::from_slice(data)?)
743    }
744
745    pub fn serialize(&self) -> Vec<u8> {
746        serde_json::to_vec(self).expect("must serialize")
747    }
748
749    pub fn timestamp(&self) -> EpochMillis {
750        match self {
751            VersionedStorageUsage::V1(StorageUsageV1 {
752                collection_timestamp,
753                ..
754            }) => *collection_timestamp,
755        }
756    }
757
758    /// Returns a globally sortable event order. All event versions must have this
759    /// field.
760    pub fn sortable_id(&self) -> u64 {
761        match self {
762            VersionedStorageUsage::V1(usage) => usage.id,
763        }
764    }
765}
766
767#[cfg(test)]
768mod tests {
769    use crate::{EventDetails, EventType, EventV1, IdNameV1, ObjectType, VersionedEvent};
770
771    // Test all versions of events. This test hard codes bytes so that
772    // programmers are not able to change data structures here without this test
773    // failing. Instead of changing data structures, add new variants.
774    #[mz_ore::test]
775    fn test_audit_log() -> Result<(), anyhow::Error> {
776        let cases: Vec<(VersionedEvent, &'static str)> = vec![(
777            VersionedEvent::V1(EventV1::new(
778                2,
779                EventType::Drop,
780                ObjectType::ClusterReplica,
781                EventDetails::IdNameV1(IdNameV1 {
782                    id: "u1".to_string(),
783                    name: "name".into(),
784                }),
785                None,
786                2,
787            )),
788            r#"{"V1":{"id":2,"event_type":"drop","object_type":"cluster-replica","details":{"IdNameV1":{"id":"u1","name":"name"}},"user":null,"occurred_at":2}}"#,
789        )];
790
791        for (event, expected_bytes) in cases {
792            let event_bytes = serde_json::to_vec(&event).unwrap();
793            assert_eq!(
794                event_bytes,
795                expected_bytes.as_bytes(),
796                "expected bytes {}, got {}",
797                expected_bytes,
798                std::str::from_utf8(&event_bytes).unwrap(),
799            );
800        }
801
802        Ok(())
803    }
804}