mz_audit_log/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Audit log data structures.
11//!
12//! The audit log is logging that is produced by user actions and consumed
13//! by users in the form of the `mz_catalog.mz_audit_events` SQL table and
14//! by the cloud management layer for billing and introspection. This crate
15//! is designed to make the production and consumption of the logs type
16//! safe. Events and their metadata are versioned and the data structures
17//! replicated here so that if the data change in some other crate, a
18//! new version here can be made. This avoids needing to poke at the data
19//! when reading it to determine what it means and should have full backward
20//! compatibility. This is its own crate so that production and consumption can
21//! be in different processes and production is not allowed to specify private
22//! data structures unknown to the reader.
23
24use mz_ore::now::EpochMillis;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28/// New version variants should be added if fields need to be added, changed, or removed.
29#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
30pub enum VersionedEvent {
31    V1(EventV1),
32}
33
34impl VersionedEvent {
35    /// Create a new event. This function must always require and produce the most
36    /// recent variant of VersionedEvent. `id` must be a globally increasing,
37    /// ordered number such that sorting by it on all events yields the order
38    /// of events by users. It is insufficient to use `occurred_at` (even at
39    /// nanosecond precision) due to clock unpredictability.
40    pub fn new(
41        id: u64,
42        event_type: EventType,
43        object_type: ObjectType,
44        details: EventDetails,
45        user: Option<String>,
46        occurred_at: EpochMillis,
47    ) -> Self {
48        Self::V1(EventV1::new(
49            id,
50            event_type,
51            object_type,
52            details,
53            user,
54            occurred_at,
55        ))
56    }
57
58    // Implement deserialize and serialize so writers and readers don't have to
59    // coordinate about which Serializer to use.
60    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
61        Ok(serde_json::from_slice(data)?)
62    }
63
64    pub fn serialize(&self) -> Vec<u8> {
65        serde_json::to_vec(self).expect("must serialize")
66    }
67
68    /// Returns a globally sortable event order. All event versions must have this
69    /// field.
70    pub fn sortable_id(&self) -> u64 {
71        match self {
72            VersionedEvent::V1(ev) => ev.id,
73        }
74    }
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
78#[serde(rename_all = "kebab-case")]
79pub enum EventType {
80    Create,
81    Drop,
82    Alter,
83    Grant,
84    Revoke,
85    Comment,
86}
87
88impl EventType {
89    pub fn as_title_case(&self) -> &'static str {
90        match self {
91            EventType::Create => "Created",
92            EventType::Drop => "Dropped",
93            EventType::Alter => "Altered",
94            EventType::Grant => "Granted",
95            EventType::Revoke => "Revoked",
96            EventType::Comment => "Comment",
97        }
98    }
99}
100
101serde_plain::derive_display_from_serialize!(EventType);
102
103#[derive(
104    Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary,
105)]
106#[serde(rename_all = "kebab-case")]
107pub enum ObjectType {
108    Cluster,
109    ClusterReplica,
110    Connection,
111    ContinualTask,
112    Database,
113    Func,
114    Index,
115    MaterializedView,
116    NetworkPolicy,
117    Role,
118    Secret,
119    Schema,
120    Sink,
121    Source,
122    System,
123    Table,
124    Type,
125    View,
126}
127
128impl ObjectType {
129    pub fn as_title_case(&self) -> &'static str {
130        match self {
131            ObjectType::Cluster => "Cluster",
132            ObjectType::ClusterReplica => "Cluster Replica",
133            ObjectType::Connection => "Connection",
134            ObjectType::ContinualTask => "Continual Task",
135            ObjectType::Database => "Database",
136            ObjectType::Func => "Function",
137            ObjectType::Index => "Index",
138            ObjectType::MaterializedView => "Materialized View",
139            ObjectType::NetworkPolicy => "Network Policy",
140            ObjectType::Role => "Role",
141            ObjectType::Schema => "Schema",
142            ObjectType::Secret => "Secret",
143            ObjectType::Sink => "Sink",
144            ObjectType::Source => "Source",
145            ObjectType::System => "System",
146            ObjectType::Table => "Table",
147            ObjectType::Type => "Type",
148            ObjectType::View => "View",
149        }
150    }
151}
152
153serde_plain::derive_display_from_serialize!(ObjectType);
154
155#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
156pub enum EventDetails {
157    #[serde(rename = "CreateComputeReplicaV1")] // historical name
158    CreateClusterReplicaV1(CreateClusterReplicaV1),
159    CreateClusterReplicaV2(CreateClusterReplicaV2),
160    CreateClusterReplicaV3(CreateClusterReplicaV3),
161    #[serde(rename = "DropComputeReplicaV1")] // historical name
162    DropClusterReplicaV1(DropClusterReplicaV1),
163    DropClusterReplicaV2(DropClusterReplicaV2),
164    DropClusterReplicaV3(DropClusterReplicaV3),
165    CreateSourceSinkV1(CreateSourceSinkV1),
166    CreateSourceSinkV2(CreateSourceSinkV2),
167    CreateSourceSinkV3(CreateSourceSinkV3),
168    CreateSourceSinkV4(CreateSourceSinkV4),
169    CreateIndexV1(CreateIndexV1),
170    CreateMaterializedViewV1(CreateMaterializedViewV1),
171    AlterSetClusterV1(AlterSetClusterV1),
172    AlterSourceSinkV1(AlterSourceSinkV1),
173    GrantRoleV1(GrantRoleV1),
174    GrantRoleV2(GrantRoleV2),
175    RevokeRoleV1(RevokeRoleV1),
176    RevokeRoleV2(RevokeRoleV2),
177    UpdatePrivilegeV1(UpdatePrivilegeV1),
178    AlterDefaultPrivilegeV1(AlterDefaultPrivilegeV1),
179    UpdateOwnerV1(UpdateOwnerV1),
180    IdFullNameV1(IdFullNameV1),
181    RenameClusterV1(RenameClusterV1),
182    RenameClusterReplicaV1(RenameClusterReplicaV1),
183    RenameItemV1(RenameItemV1),
184    IdNameV1(IdNameV1),
185    SchemaV1(SchemaV1),
186    SchemaV2(SchemaV2),
187    UpdateItemV1(UpdateItemV1),
188    RenameSchemaV1(RenameSchemaV1),
189    AlterRetainHistoryV1(AlterRetainHistoryV1),
190    ToNewIdV1(ToNewIdV1),
191    FromPreviousIdV1(FromPreviousIdV1),
192    SetV1(SetV1),
193    ResetAllV1,
194    RotateKeysV1(RotateKeysV1),
195}
196
197#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
198pub struct SetV1 {
199    pub name: String,
200    pub value: Option<String>,
201}
202
203#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
204pub struct RotateKeysV1 {
205    pub id: String,
206    pub name: String,
207}
208
209#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
210pub struct IdFullNameV1 {
211    pub id: String,
212    #[serde(flatten)]
213    pub name: FullNameV1,
214}
215
216#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
217pub struct FullNameV1 {
218    pub database: String,
219    pub schema: String,
220    pub item: String,
221}
222
223#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
224pub struct IdNameV1 {
225    pub id: String,
226    pub name: String,
227}
228
229#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
230pub struct RenameItemV1 {
231    pub id: String,
232    pub old_name: FullNameV1,
233    pub new_name: FullNameV1,
234}
235
236#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
237pub struct RenameClusterV1 {
238    pub id: String,
239    pub old_name: String,
240    pub new_name: String,
241}
242
243#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
244pub struct RenameClusterReplicaV1 {
245    pub cluster_id: String,
246    pub replica_id: String,
247    pub old_name: String,
248    pub new_name: String,
249}
250
251#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
252pub struct DropClusterReplicaV1 {
253    pub cluster_id: String,
254    pub cluster_name: String,
255    // Events that predate v0.32.0 will not have this field set.
256    #[serde(skip_serializing_if = "Option::is_none")]
257    pub replica_id: Option<String>,
258    pub replica_name: String,
259}
260
261#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
262pub struct DropClusterReplicaV2 {
263    pub cluster_id: String,
264    pub cluster_name: String,
265    pub replica_id: Option<String>,
266    pub replica_name: String,
267    pub reason: CreateOrDropClusterReplicaReasonV1,
268    #[serde(skip_serializing_if = "Option::is_none")]
269    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
270}
271
272#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
273pub struct DropClusterReplicaV3 {
274    pub cluster_id: String,
275    pub cluster_name: String,
276    pub replica_id: Option<String>,
277    pub replica_name: String,
278    pub reason: CreateOrDropClusterReplicaReasonV1,
279    #[serde(skip_serializing_if = "Option::is_none")]
280    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
281}
282
283#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
284pub struct CreateClusterReplicaV1 {
285    pub cluster_id: String,
286    pub cluster_name: String,
287    // Events that predate v0.32.0 will not have this field set.
288    #[serde(skip_serializing_if = "Option::is_none")]
289    pub replica_id: Option<String>,
290    pub replica_name: String,
291    pub logical_size: String,
292    pub disk: bool,
293    pub billed_as: Option<String>,
294    pub internal: bool,
295}
296
297#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
298pub struct CreateClusterReplicaV2 {
299    pub cluster_id: String,
300    pub cluster_name: String,
301    pub replica_id: Option<String>,
302    pub replica_name: String,
303    pub logical_size: String,
304    pub disk: bool,
305    pub billed_as: Option<String>,
306    pub internal: bool,
307    pub reason: CreateOrDropClusterReplicaReasonV1,
308    #[serde(skip_serializing_if = "Option::is_none")]
309    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
310}
311
312#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
313pub struct CreateClusterReplicaV3 {
314    pub cluster_id: String,
315    pub cluster_name: String,
316    pub replica_id: Option<String>,
317    pub replica_name: String,
318    pub logical_size: String,
319    pub disk: bool,
320    pub billed_as: Option<String>,
321    pub internal: bool,
322    pub reason: CreateOrDropClusterReplicaReasonV1,
323    #[serde(skip_serializing_if = "Option::is_none")]
324    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
325}
326
327#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
328#[serde(rename_all = "kebab-case")]
329pub enum CreateOrDropClusterReplicaReasonV1 {
330    Manual,
331    Schedule,
332    System,
333}
334
335/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
336/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
337/// can be settings of the policy as well as other information about the state of the system.)
338#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
339pub struct SchedulingDecisionsWithReasonsV1 {
340    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
341    pub on_refresh: RefreshDecisionWithReasonV1,
342}
343
344/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
345/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
346/// can be settings of the policy as well as other information about the state of the system.)
347#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
348pub struct SchedulingDecisionsWithReasonsV2 {
349    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
350    pub on_refresh: RefreshDecisionWithReasonV2,
351}
352
353#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
354pub struct RefreshDecisionWithReasonV1 {
355    pub decision: SchedulingDecisionV1,
356    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
357    /// time estimate).
358    pub objects_needing_refresh: Vec<String>,
359    /// The HYDRATION TIME ESTIMATE setting of the cluster.
360    pub hydration_time_estimate: String,
361}
362
363#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
364pub struct RefreshDecisionWithReasonV2 {
365    pub decision: SchedulingDecisionV1,
366    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
367    /// time estimate), and therefore should keep the cluster On.
368    pub objects_needing_refresh: Vec<String>,
369    /// Objects for which we estimate that they currently need Persist compaction, and therefore
370    /// should keep the cluster On.
371    pub objects_needing_compaction: Vec<String>,
372    /// The HYDRATION TIME ESTIMATE setting of the cluster.
373    pub hydration_time_estimate: String,
374}
375
376#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
377#[serde(rename_all = "kebab-case")]
378pub enum SchedulingDecisionV1 {
379    On,
380    Off,
381}
382
383impl From<bool> for SchedulingDecisionV1 {
384    fn from(value: bool) -> Self {
385        match value {
386            true => SchedulingDecisionV1::On,
387            false => SchedulingDecisionV1::Off,
388        }
389    }
390}
391
392#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
393pub struct CreateSourceSinkV1 {
394    pub id: String,
395    #[serde(flatten)]
396    pub name: FullNameV1,
397    pub size: Option<String>,
398}
399
400#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
401pub struct CreateSourceSinkV2 {
402    pub id: String,
403    #[serde(flatten)]
404    pub name: FullNameV1,
405    pub size: Option<String>,
406    #[serde(rename = "type")]
407    pub external_type: String,
408}
409
410#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
411pub struct CreateSourceSinkV3 {
412    pub id: String,
413    #[serde(flatten)]
414    pub name: FullNameV1,
415    #[serde(rename = "type")]
416    pub external_type: String,
417}
418
419#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
420pub struct CreateSourceSinkV4 {
421    pub id: String,
422    pub cluster_id: Option<String>,
423    #[serde(flatten)]
424    pub name: FullNameV1,
425    #[serde(rename = "type")]
426    pub external_type: String,
427}
428
429#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
430pub struct CreateIndexV1 {
431    pub id: String,
432    pub cluster_id: String,
433    #[serde(flatten)]
434    pub name: FullNameV1,
435}
436
437#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
438pub struct CreateMaterializedViewV1 {
439    pub id: String,
440    pub cluster_id: String,
441    #[serde(flatten)]
442    pub name: FullNameV1,
443}
444
445#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
446pub struct AlterSourceSinkV1 {
447    pub id: String,
448    #[serde(flatten)]
449    pub name: FullNameV1,
450    pub old_size: Option<String>,
451    pub new_size: Option<String>,
452}
453
454#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
455pub struct AlterSetClusterV1 {
456    pub id: String,
457    #[serde(flatten)]
458    pub name: FullNameV1,
459    pub old_cluster: Option<String>,
460    pub new_cluster: Option<String>,
461}
462
463#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
464pub struct GrantRoleV1 {
465    pub role_id: String,
466    pub member_id: String,
467    pub grantor_id: String,
468}
469
470#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
471pub struct GrantRoleV2 {
472    pub role_id: String,
473    pub member_id: String,
474    pub grantor_id: String,
475    pub executed_by: String,
476}
477
478#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
479pub struct RevokeRoleV1 {
480    pub role_id: String,
481    pub member_id: String,
482}
483
484#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
485pub struct RevokeRoleV2 {
486    pub role_id: String,
487    pub member_id: String,
488    pub grantor_id: String,
489    pub executed_by: String,
490}
491
492#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
493pub struct UpdatePrivilegeV1 {
494    pub object_id: String,
495    pub grantee_id: String,
496    pub grantor_id: String,
497    pub privileges: String,
498}
499
500#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
501pub struct AlterDefaultPrivilegeV1 {
502    pub role_id: String,
503    pub database_id: Option<String>,
504    pub schema_id: Option<String>,
505    pub grantee_id: String,
506    pub privileges: String,
507}
508
509#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
510pub struct UpdateOwnerV1 {
511    pub object_id: String,
512    pub old_owner_id: String,
513    pub new_owner_id: String,
514}
515
516#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
517pub struct SchemaV1 {
518    pub id: String,
519    pub name: String,
520    pub database_name: String,
521}
522
523#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
524pub struct SchemaV2 {
525    pub id: String,
526    pub name: String,
527    pub database_name: Option<String>,
528}
529
530#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
531pub struct RenameSchemaV1 {
532    pub id: String,
533    pub database_name: Option<String>,
534    pub old_name: String,
535    pub new_name: String,
536}
537
538#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
539pub struct AlterRetainHistoryV1 {
540    pub id: String,
541    pub old_history: Option<String>,
542    pub new_history: Option<String>,
543}
544
545#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
546pub struct UpdateItemV1 {
547    pub id: String,
548    #[serde(flatten)]
549    pub name: FullNameV1,
550}
551
552#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
553pub struct ToNewIdV1 {
554    pub id: String,
555    pub new_id: String,
556}
557
558#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
559pub struct FromPreviousIdV1 {
560    pub id: String,
561    pub previous_id: String,
562}
563
564impl EventDetails {
565    pub fn as_json(&self) -> serde_json::Value {
566        match self {
567            EventDetails::CreateClusterReplicaV1(v) => {
568                serde_json::to_value(v).expect("must serialize")
569            }
570            EventDetails::CreateClusterReplicaV2(v) => {
571                serde_json::to_value(v).expect("must serialize")
572            }
573            EventDetails::CreateClusterReplicaV3(v) => {
574                serde_json::to_value(v).expect("must serialize")
575            }
576            EventDetails::DropClusterReplicaV1(v) => {
577                serde_json::to_value(v).expect("must serialize")
578            }
579            EventDetails::DropClusterReplicaV2(v) => {
580                serde_json::to_value(v).expect("must serialize")
581            }
582            EventDetails::DropClusterReplicaV3(v) => {
583                serde_json::to_value(v).expect("must serialize")
584            }
585            EventDetails::IdFullNameV1(v) => serde_json::to_value(v).expect("must serialize"),
586            EventDetails::RenameClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
587            EventDetails::RenameClusterReplicaV1(v) => {
588                serde_json::to_value(v).expect("must serialize")
589            }
590            EventDetails::RenameItemV1(v) => serde_json::to_value(v).expect("must serialize"),
591            EventDetails::IdNameV1(v) => serde_json::to_value(v).expect("must serialize"),
592            EventDetails::SchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
593            EventDetails::SchemaV2(v) => serde_json::to_value(v).expect("must serialize"),
594            EventDetails::RenameSchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
595            EventDetails::CreateSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
596            EventDetails::CreateSourceSinkV2(v) => serde_json::to_value(v).expect("must serialize"),
597            EventDetails::CreateSourceSinkV3(v) => serde_json::to_value(v).expect("must serialize"),
598            EventDetails::CreateSourceSinkV4(v) => serde_json::to_value(v).expect("must serialize"),
599            EventDetails::CreateIndexV1(v) => serde_json::to_value(v).expect("must serialize"),
600            EventDetails::CreateMaterializedViewV1(v) => {
601                serde_json::to_value(v).expect("must serialize")
602            }
603            EventDetails::AlterSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
604            EventDetails::AlterSetClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
605            EventDetails::GrantRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
606            EventDetails::GrantRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
607            EventDetails::RevokeRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
608            EventDetails::RevokeRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
609            EventDetails::UpdatePrivilegeV1(v) => serde_json::to_value(v).expect("must serialize"),
610            EventDetails::AlterDefaultPrivilegeV1(v) => {
611                serde_json::to_value(v).expect("must serialize")
612            }
613            EventDetails::UpdateOwnerV1(v) => serde_json::to_value(v).expect("must serialize"),
614            EventDetails::UpdateItemV1(v) => serde_json::to_value(v).expect("must serialize"),
615            EventDetails::AlterRetainHistoryV1(v) => {
616                serde_json::to_value(v).expect("must serialize")
617            }
618            EventDetails::ToNewIdV1(v) => serde_json::to_value(v).expect("must serialize"),
619            EventDetails::FromPreviousIdV1(v) => serde_json::to_value(v).expect("must serialize"),
620            EventDetails::SetV1(v) => serde_json::to_value(v).expect("must serialize"),
621            EventDetails::ResetAllV1 => serde_json::Value::Null,
622            EventDetails::RotateKeysV1(v) => serde_json::to_value(v).expect("must serialize"),
623        }
624    }
625}
626
627#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
628pub struct EventV1 {
629    pub id: u64,
630    pub event_type: EventType,
631    pub object_type: ObjectType,
632    pub details: EventDetails,
633    pub user: Option<String>,
634    pub occurred_at: EpochMillis,
635}
636
637impl EventV1 {
638    fn new(
639        id: u64,
640        event_type: EventType,
641        object_type: ObjectType,
642        details: EventDetails,
643        user: Option<String>,
644        occurred_at: EpochMillis,
645    ) -> EventV1 {
646        EventV1 {
647            id,
648            event_type,
649            object_type,
650            details,
651            user,
652            occurred_at,
653        }
654    }
655}
656
657#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
658pub struct StorageUsageV1 {
659    pub id: u64,
660    pub shard_id: Option<String>,
661    pub size_bytes: u64,
662    pub collection_timestamp: EpochMillis,
663}
664
665impl StorageUsageV1 {
666    pub fn new(
667        id: u64,
668        shard_id: Option<String>,
669        size_bytes: u64,
670        collection_timestamp: EpochMillis,
671    ) -> StorageUsageV1 {
672        StorageUsageV1 {
673            id,
674            shard_id,
675            size_bytes,
676            collection_timestamp,
677        }
678    }
679}
680
681/// Describes the environment's storage usage at a point in time.
682///
683/// This type is persisted in the catalog across restarts, so any updates to the
684/// schema will require a new version.
685#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
686pub enum VersionedStorageUsage {
687    V1(StorageUsageV1),
688}
689
690impl VersionedStorageUsage {
691    /// Create a new metric snapshot.
692    /// This function must always require and produce the most
693    /// recent variant of VersionedStorageMetrics.
694    pub fn new(
695        id: u64,
696        object_id: Option<String>,
697        size_bytes: u64,
698        collection_timestamp: EpochMillis,
699    ) -> Self {
700        Self::V1(StorageUsageV1::new(
701            id,
702            object_id,
703            size_bytes,
704            collection_timestamp,
705        ))
706    }
707
708    // Implement deserialize and serialize so writers and readers don't have to
709    // coordinate about which Serializer to use.
710    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
711        Ok(serde_json::from_slice(data)?)
712    }
713
714    pub fn serialize(&self) -> Vec<u8> {
715        serde_json::to_vec(self).expect("must serialize")
716    }
717
718    pub fn timestamp(&self) -> EpochMillis {
719        match self {
720            VersionedStorageUsage::V1(StorageUsageV1 {
721                collection_timestamp,
722                ..
723            }) => *collection_timestamp,
724        }
725    }
726
727    /// Returns a globally sortable event order. All event versions must have this
728    /// field.
729    pub fn sortable_id(&self) -> u64 {
730        match self {
731            VersionedStorageUsage::V1(usage) => usage.id,
732        }
733    }
734}
735
736#[cfg(test)]
737mod tests {
738    use crate::{EventDetails, EventType, EventV1, IdNameV1, ObjectType, VersionedEvent};
739
740    // Test all versions of events. This test hard codes bytes so that
741    // programmers are not able to change data structures here without this test
742    // failing. Instead of changing data structures, add new variants.
743    #[mz_ore::test]
744    fn test_audit_log() -> Result<(), anyhow::Error> {
745        let cases: Vec<(VersionedEvent, &'static str)> = vec![(
746            VersionedEvent::V1(EventV1::new(
747                2,
748                EventType::Drop,
749                ObjectType::ClusterReplica,
750                EventDetails::IdNameV1(IdNameV1 {
751                    id: "u1".to_string(),
752                    name: "name".into(),
753                }),
754                None,
755                2,
756            )),
757            r#"{"V1":{"id":2,"event_type":"drop","object_type":"cluster-replica","details":{"IdNameV1":{"id":"u1","name":"name"}},"user":null,"occurred_at":2}}"#,
758        )];
759
760        for (event, expected_bytes) in cases {
761            let event_bytes = serde_json::to_vec(&event).unwrap();
762            assert_eq!(
763                event_bytes,
764                expected_bytes.as_bytes(),
765                "expected bytes {}, got {}",
766                expected_bytes,
767                std::str::from_utf8(&event_bytes).unwrap(),
768            );
769        }
770
771        Ok(())
772    }
773}