mz_audit_log/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Audit log data structures.
11//!
12//! The audit log is logging that is produced by user actions and consumed
13//! by users in the form of the `mz_catalog.mz_audit_events` SQL table and
14//! by the cloud management layer for billing and introspection. This crate
15//! is designed to make the production and consumption of the logs type
16//! safe. Events and their metadata are versioned and the data structures
17//! replicated here so that if the data change in some other crate, a
18//! new version here can be made. This avoids needing to poke at the data
19//! when reading it to determine what it means and should have full backward
20//! compatibility. This is its own crate so that production and consumption can
21//! be in different processes and production is not allowed to specify private
22//! data structures unknown to the reader.
23
24use mz_ore::now::EpochMillis;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28/// New version variants should be added if fields need to be added, changed, or removed.
29#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
30pub enum VersionedEvent {
31    V1(EventV1),
32}
33
34impl VersionedEvent {
35    /// Create a new event. This function must always require and produce the most
36    /// recent variant of VersionedEvent. `id` must be a globally increasing,
37    /// ordered number such that sorting by it on all events yields the order
38    /// of events by users. It is insufficient to use `occurred_at` (even at
39    /// nanosecond precision) due to clock unpredictability.
40    pub fn new(
41        id: u64,
42        event_type: EventType,
43        object_type: ObjectType,
44        details: EventDetails,
45        user: Option<String>,
46        occurred_at: EpochMillis,
47    ) -> Self {
48        Self::V1(EventV1::new(
49            id,
50            event_type,
51            object_type,
52            details,
53            user,
54            occurred_at,
55        ))
56    }
57
58    // Implement deserialize and serialize so writers and readers don't have to
59    // coordinate about which Serializer to use.
60    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
61        Ok(serde_json::from_slice(data)?)
62    }
63
64    pub fn serialize(&self) -> Vec<u8> {
65        serde_json::to_vec(self).expect("must serialize")
66    }
67
68    /// Returns a globally sortable event order. All event versions must have this
69    /// field.
70    pub fn sortable_id(&self) -> u64 {
71        match self {
72            VersionedEvent::V1(ev) => ev.id,
73        }
74    }
75}
76
77#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
78#[serde(rename_all = "kebab-case")]
79pub enum EventType {
80    Create,
81    Drop,
82    Alter,
83    Grant,
84    Revoke,
85    Comment,
86}
87
88impl EventType {
89    pub fn as_title_case(&self) -> &'static str {
90        match self {
91            EventType::Create => "Created",
92            EventType::Drop => "Dropped",
93            EventType::Alter => "Altered",
94            EventType::Grant => "Granted",
95            EventType::Revoke => "Revoked",
96            EventType::Comment => "Comment",
97        }
98    }
99}
100
101serde_plain::derive_display_from_serialize!(EventType);
102
103#[derive(
104    Clone, Copy, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary,
105)]
106#[serde(rename_all = "kebab-case")]
107pub enum ObjectType {
108    Cluster,
109    ClusterReplica,
110    Connection,
111    ContinualTask,
112    Database,
113    Func,
114    Index,
115    MaterializedView,
116    NetworkPolicy,
117    Role,
118    Secret,
119    Schema,
120    Sink,
121    Source,
122    System,
123    Table,
124    Type,
125    View,
126}
127
128impl ObjectType {
129    pub fn as_title_case(&self) -> &'static str {
130        match self {
131            ObjectType::Cluster => "Cluster",
132            ObjectType::ClusterReplica => "Cluster Replica",
133            ObjectType::Connection => "Connection",
134            ObjectType::ContinualTask => "Continual Task",
135            ObjectType::Database => "Database",
136            ObjectType::Func => "Function",
137            ObjectType::Index => "Index",
138            ObjectType::MaterializedView => "Materialized View",
139            ObjectType::NetworkPolicy => "Network Policy",
140            ObjectType::Role => "Role",
141            ObjectType::Schema => "Schema",
142            ObjectType::Secret => "Secret",
143            ObjectType::Sink => "Sink",
144            ObjectType::Source => "Source",
145            ObjectType::System => "System",
146            ObjectType::Table => "Table",
147            ObjectType::Type => "Type",
148            ObjectType::View => "View",
149        }
150    }
151}
152
153serde_plain::derive_display_from_serialize!(ObjectType);
154
155#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
156pub enum EventDetails {
157    #[serde(rename = "CreateComputeReplicaV1")] // historical name
158    CreateClusterReplicaV1(CreateClusterReplicaV1),
159    CreateClusterReplicaV2(CreateClusterReplicaV2),
160    CreateClusterReplicaV3(CreateClusterReplicaV3),
161    CreateClusterReplicaV4(CreateClusterReplicaV4),
162    #[serde(rename = "DropComputeReplicaV1")] // historical name
163    DropClusterReplicaV1(DropClusterReplicaV1),
164    DropClusterReplicaV2(DropClusterReplicaV2),
165    DropClusterReplicaV3(DropClusterReplicaV3),
166    CreateSourceSinkV1(CreateSourceSinkV1),
167    CreateSourceSinkV2(CreateSourceSinkV2),
168    CreateSourceSinkV3(CreateSourceSinkV3),
169    CreateSourceSinkV4(CreateSourceSinkV4),
170    CreateIndexV1(CreateIndexV1),
171    CreateMaterializedViewV1(CreateMaterializedViewV1),
172    AlterSetClusterV1(AlterSetClusterV1),
173    AlterSourceSinkV1(AlterSourceSinkV1),
174    GrantRoleV1(GrantRoleV1),
175    GrantRoleV2(GrantRoleV2),
176    RevokeRoleV1(RevokeRoleV1),
177    RevokeRoleV2(RevokeRoleV2),
178    UpdatePrivilegeV1(UpdatePrivilegeV1),
179    AlterDefaultPrivilegeV1(AlterDefaultPrivilegeV1),
180    UpdateOwnerV1(UpdateOwnerV1),
181    IdFullNameV1(IdFullNameV1),
182    RenameClusterV1(RenameClusterV1),
183    RenameClusterReplicaV1(RenameClusterReplicaV1),
184    RenameItemV1(RenameItemV1),
185    IdNameV1(IdNameV1),
186    SchemaV1(SchemaV1),
187    SchemaV2(SchemaV2),
188    UpdateItemV1(UpdateItemV1),
189    RenameSchemaV1(RenameSchemaV1),
190    AlterRetainHistoryV1(AlterRetainHistoryV1),
191    ToNewIdV1(ToNewIdV1),
192    FromPreviousIdV1(FromPreviousIdV1),
193    SetV1(SetV1),
194    ResetAllV1,
195    RotateKeysV1(RotateKeysV1),
196}
197
198#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
199pub struct SetV1 {
200    pub name: String,
201    pub value: Option<String>,
202}
203
204#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
205pub struct RotateKeysV1 {
206    pub id: String,
207    pub name: String,
208}
209
210#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
211pub struct IdFullNameV1 {
212    pub id: String,
213    #[serde(flatten)]
214    pub name: FullNameV1,
215}
216
217#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
218pub struct FullNameV1 {
219    pub database: String,
220    pub schema: String,
221    pub item: String,
222}
223
224#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
225pub struct IdNameV1 {
226    pub id: String,
227    pub name: String,
228}
229
230#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
231pub struct RenameItemV1 {
232    pub id: String,
233    pub old_name: FullNameV1,
234    pub new_name: FullNameV1,
235}
236
237#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
238pub struct RenameClusterV1 {
239    pub id: String,
240    pub old_name: String,
241    pub new_name: String,
242}
243
244#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
245pub struct RenameClusterReplicaV1 {
246    pub cluster_id: String,
247    pub replica_id: String,
248    pub old_name: String,
249    pub new_name: String,
250}
251
252#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
253pub struct DropClusterReplicaV1 {
254    pub cluster_id: String,
255    pub cluster_name: String,
256    // Events that predate v0.32.0 will not have this field set.
257    #[serde(skip_serializing_if = "Option::is_none")]
258    pub replica_id: Option<String>,
259    pub replica_name: String,
260}
261
262#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
263pub struct DropClusterReplicaV2 {
264    pub cluster_id: String,
265    pub cluster_name: String,
266    pub replica_id: Option<String>,
267    pub replica_name: String,
268    pub reason: CreateOrDropClusterReplicaReasonV1,
269    #[serde(skip_serializing_if = "Option::is_none")]
270    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
271}
272
273#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
274pub struct DropClusterReplicaV3 {
275    pub cluster_id: String,
276    pub cluster_name: String,
277    pub replica_id: Option<String>,
278    pub replica_name: String,
279    pub reason: CreateOrDropClusterReplicaReasonV1,
280    #[serde(skip_serializing_if = "Option::is_none")]
281    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
282}
283
284#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
285pub struct CreateClusterReplicaV1 {
286    pub cluster_id: String,
287    pub cluster_name: String,
288    // Events that predate v0.32.0 will not have this field set.
289    #[serde(skip_serializing_if = "Option::is_none")]
290    pub replica_id: Option<String>,
291    pub replica_name: String,
292    pub logical_size: String,
293    pub disk: bool,
294    pub billed_as: Option<String>,
295    pub internal: bool,
296}
297
298#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
299pub struct CreateClusterReplicaV2 {
300    pub cluster_id: String,
301    pub cluster_name: String,
302    pub replica_id: Option<String>,
303    pub replica_name: String,
304    pub logical_size: String,
305    pub disk: bool,
306    pub billed_as: Option<String>,
307    pub internal: bool,
308    pub reason: CreateOrDropClusterReplicaReasonV1,
309    #[serde(skip_serializing_if = "Option::is_none")]
310    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
311}
312
313#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
314pub struct CreateClusterReplicaV3 {
315    pub cluster_id: String,
316    pub cluster_name: String,
317    pub replica_id: Option<String>,
318    pub replica_name: String,
319    pub logical_size: String,
320    pub disk: bool,
321    pub billed_as: Option<String>,
322    pub internal: bool,
323    pub reason: CreateOrDropClusterReplicaReasonV1,
324    #[serde(skip_serializing_if = "Option::is_none")]
325    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
326}
327
328#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
329pub struct CreateClusterReplicaV4 {
330    pub cluster_id: String,
331    pub cluster_name: String,
332    pub replica_id: Option<String>,
333    pub replica_name: String,
334    pub logical_size: String,
335    pub billed_as: Option<String>,
336    pub internal: bool,
337    pub reason: CreateOrDropClusterReplicaReasonV1,
338    #[serde(skip_serializing_if = "Option::is_none")]
339    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
340}
341
342#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
343#[serde(rename_all = "kebab-case")]
344pub enum CreateOrDropClusterReplicaReasonV1 {
345    Manual,
346    Schedule,
347    System,
348}
349
350/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
351/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
352/// can be settings of the policy as well as other information about the state of the system.)
353#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
354pub struct SchedulingDecisionsWithReasonsV1 {
355    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
356    pub on_refresh: RefreshDecisionWithReasonV1,
357}
358
359/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
360/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
361/// can be settings of the policy as well as other information about the state of the system.)
362#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
363pub struct SchedulingDecisionsWithReasonsV2 {
364    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
365    pub on_refresh: RefreshDecisionWithReasonV2,
366}
367
368#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
369pub struct RefreshDecisionWithReasonV1 {
370    pub decision: SchedulingDecisionV1,
371    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
372    /// time estimate).
373    pub objects_needing_refresh: Vec<String>,
374    /// The HYDRATION TIME ESTIMATE setting of the cluster.
375    pub hydration_time_estimate: String,
376}
377
378#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
379pub struct RefreshDecisionWithReasonV2 {
380    pub decision: SchedulingDecisionV1,
381    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
382    /// time estimate), and therefore should keep the cluster On.
383    pub objects_needing_refresh: Vec<String>,
384    /// Objects for which we estimate that they currently need Persist compaction, and therefore
385    /// should keep the cluster On.
386    pub objects_needing_compaction: Vec<String>,
387    /// The HYDRATION TIME ESTIMATE setting of the cluster.
388    pub hydration_time_estimate: String,
389}
390
391#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
392#[serde(rename_all = "kebab-case")]
393pub enum SchedulingDecisionV1 {
394    On,
395    Off,
396}
397
398impl From<bool> for SchedulingDecisionV1 {
399    fn from(value: bool) -> Self {
400        match value {
401            true => SchedulingDecisionV1::On,
402            false => SchedulingDecisionV1::Off,
403        }
404    }
405}
406
407#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
408pub struct CreateSourceSinkV1 {
409    pub id: String,
410    #[serde(flatten)]
411    pub name: FullNameV1,
412    pub size: Option<String>,
413}
414
415#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
416pub struct CreateSourceSinkV2 {
417    pub id: String,
418    #[serde(flatten)]
419    pub name: FullNameV1,
420    pub size: Option<String>,
421    #[serde(rename = "type")]
422    pub external_type: String,
423}
424
425#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
426pub struct CreateSourceSinkV3 {
427    pub id: String,
428    #[serde(flatten)]
429    pub name: FullNameV1,
430    #[serde(rename = "type")]
431    pub external_type: String,
432}
433
434#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
435pub struct CreateSourceSinkV4 {
436    pub id: String,
437    pub cluster_id: Option<String>,
438    #[serde(flatten)]
439    pub name: FullNameV1,
440    #[serde(rename = "type")]
441    pub external_type: String,
442}
443
444#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
445pub struct CreateIndexV1 {
446    pub id: String,
447    pub cluster_id: String,
448    #[serde(flatten)]
449    pub name: FullNameV1,
450}
451
452#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
453pub struct CreateMaterializedViewV1 {
454    pub id: String,
455    pub cluster_id: String,
456    #[serde(flatten)]
457    pub name: FullNameV1,
458}
459
460#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
461pub struct AlterSourceSinkV1 {
462    pub id: String,
463    #[serde(flatten)]
464    pub name: FullNameV1,
465    pub old_size: Option<String>,
466    pub new_size: Option<String>,
467}
468
469#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
470pub struct AlterSetClusterV1 {
471    pub id: String,
472    #[serde(flatten)]
473    pub name: FullNameV1,
474    pub old_cluster: Option<String>,
475    pub new_cluster: Option<String>,
476}
477
478#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
479pub struct GrantRoleV1 {
480    pub role_id: String,
481    pub member_id: String,
482    pub grantor_id: String,
483}
484
485#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
486pub struct GrantRoleV2 {
487    pub role_id: String,
488    pub member_id: String,
489    pub grantor_id: String,
490    pub executed_by: String,
491}
492
493#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
494pub struct RevokeRoleV1 {
495    pub role_id: String,
496    pub member_id: String,
497}
498
499#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
500pub struct RevokeRoleV2 {
501    pub role_id: String,
502    pub member_id: String,
503    pub grantor_id: String,
504    pub executed_by: String,
505}
506
507#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
508pub struct UpdatePrivilegeV1 {
509    pub object_id: String,
510    pub grantee_id: String,
511    pub grantor_id: String,
512    pub privileges: String,
513}
514
515#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
516pub struct AlterDefaultPrivilegeV1 {
517    pub role_id: String,
518    pub database_id: Option<String>,
519    pub schema_id: Option<String>,
520    pub grantee_id: String,
521    pub privileges: String,
522}
523
524#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
525pub struct UpdateOwnerV1 {
526    pub object_id: String,
527    pub old_owner_id: String,
528    pub new_owner_id: String,
529}
530
531#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
532pub struct SchemaV1 {
533    pub id: String,
534    pub name: String,
535    pub database_name: String,
536}
537
538#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
539pub struct SchemaV2 {
540    pub id: String,
541    pub name: String,
542    pub database_name: Option<String>,
543}
544
545#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
546pub struct RenameSchemaV1 {
547    pub id: String,
548    pub database_name: Option<String>,
549    pub old_name: String,
550    pub new_name: String,
551}
552
553#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
554pub struct AlterRetainHistoryV1 {
555    pub id: String,
556    pub old_history: Option<String>,
557    pub new_history: Option<String>,
558}
559
560#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
561pub struct UpdateItemV1 {
562    pub id: String,
563    #[serde(flatten)]
564    pub name: FullNameV1,
565}
566
567#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
568pub struct ToNewIdV1 {
569    pub id: String,
570    pub new_id: String,
571}
572
573#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
574pub struct FromPreviousIdV1 {
575    pub id: String,
576    pub previous_id: String,
577}
578
579impl EventDetails {
580    pub fn as_json(&self) -> serde_json::Value {
581        match self {
582            EventDetails::CreateClusterReplicaV1(v) => {
583                serde_json::to_value(v).expect("must serialize")
584            }
585            EventDetails::CreateClusterReplicaV2(v) => {
586                serde_json::to_value(v).expect("must serialize")
587            }
588            EventDetails::CreateClusterReplicaV3(v) => {
589                serde_json::to_value(v).expect("must serialize")
590            }
591            EventDetails::CreateClusterReplicaV4(v) => {
592                serde_json::to_value(v).expect("must serialize")
593            }
594            EventDetails::DropClusterReplicaV1(v) => {
595                serde_json::to_value(v).expect("must serialize")
596            }
597            EventDetails::DropClusterReplicaV2(v) => {
598                serde_json::to_value(v).expect("must serialize")
599            }
600            EventDetails::DropClusterReplicaV3(v) => {
601                serde_json::to_value(v).expect("must serialize")
602            }
603            EventDetails::IdFullNameV1(v) => serde_json::to_value(v).expect("must serialize"),
604            EventDetails::RenameClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
605            EventDetails::RenameClusterReplicaV1(v) => {
606                serde_json::to_value(v).expect("must serialize")
607            }
608            EventDetails::RenameItemV1(v) => serde_json::to_value(v).expect("must serialize"),
609            EventDetails::IdNameV1(v) => serde_json::to_value(v).expect("must serialize"),
610            EventDetails::SchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
611            EventDetails::SchemaV2(v) => serde_json::to_value(v).expect("must serialize"),
612            EventDetails::RenameSchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
613            EventDetails::CreateSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
614            EventDetails::CreateSourceSinkV2(v) => serde_json::to_value(v).expect("must serialize"),
615            EventDetails::CreateSourceSinkV3(v) => serde_json::to_value(v).expect("must serialize"),
616            EventDetails::CreateSourceSinkV4(v) => serde_json::to_value(v).expect("must serialize"),
617            EventDetails::CreateIndexV1(v) => serde_json::to_value(v).expect("must serialize"),
618            EventDetails::CreateMaterializedViewV1(v) => {
619                serde_json::to_value(v).expect("must serialize")
620            }
621            EventDetails::AlterSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
622            EventDetails::AlterSetClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
623            EventDetails::GrantRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
624            EventDetails::GrantRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
625            EventDetails::RevokeRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
626            EventDetails::RevokeRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
627            EventDetails::UpdatePrivilegeV1(v) => serde_json::to_value(v).expect("must serialize"),
628            EventDetails::AlterDefaultPrivilegeV1(v) => {
629                serde_json::to_value(v).expect("must serialize")
630            }
631            EventDetails::UpdateOwnerV1(v) => serde_json::to_value(v).expect("must serialize"),
632            EventDetails::UpdateItemV1(v) => serde_json::to_value(v).expect("must serialize"),
633            EventDetails::AlterRetainHistoryV1(v) => {
634                serde_json::to_value(v).expect("must serialize")
635            }
636            EventDetails::ToNewIdV1(v) => serde_json::to_value(v).expect("must serialize"),
637            EventDetails::FromPreviousIdV1(v) => serde_json::to_value(v).expect("must serialize"),
638            EventDetails::SetV1(v) => serde_json::to_value(v).expect("must serialize"),
639            EventDetails::ResetAllV1 => serde_json::Value::Null,
640            EventDetails::RotateKeysV1(v) => serde_json::to_value(v).expect("must serialize"),
641        }
642    }
643}
644
645#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
646pub struct EventV1 {
647    pub id: u64,
648    pub event_type: EventType,
649    pub object_type: ObjectType,
650    pub details: EventDetails,
651    pub user: Option<String>,
652    pub occurred_at: EpochMillis,
653}
654
655impl EventV1 {
656    fn new(
657        id: u64,
658        event_type: EventType,
659        object_type: ObjectType,
660        details: EventDetails,
661        user: Option<String>,
662        occurred_at: EpochMillis,
663    ) -> EventV1 {
664        EventV1 {
665            id,
666            event_type,
667            object_type,
668            details,
669            user,
670            occurred_at,
671        }
672    }
673}
674
675#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
676pub struct StorageUsageV1 {
677    pub id: u64,
678    pub shard_id: Option<String>,
679    pub size_bytes: u64,
680    pub collection_timestamp: EpochMillis,
681}
682
683impl StorageUsageV1 {
684    pub fn new(
685        id: u64,
686        shard_id: Option<String>,
687        size_bytes: u64,
688        collection_timestamp: EpochMillis,
689    ) -> StorageUsageV1 {
690        StorageUsageV1 {
691            id,
692            shard_id,
693            size_bytes,
694            collection_timestamp,
695        }
696    }
697}
698
699/// Describes the environment's storage usage at a point in time.
700///
701/// This type is persisted in the catalog across restarts, so any updates to the
702/// schema will require a new version.
703#[derive(Clone, Debug, Serialize, Deserialize, PartialOrd, PartialEq, Eq, Ord, Hash, Arbitrary)]
704pub enum VersionedStorageUsage {
705    V1(StorageUsageV1),
706}
707
708impl VersionedStorageUsage {
709    /// Create a new metric snapshot.
710    /// This function must always require and produce the most
711    /// recent variant of VersionedStorageMetrics.
712    pub fn new(
713        id: u64,
714        object_id: Option<String>,
715        size_bytes: u64,
716        collection_timestamp: EpochMillis,
717    ) -> Self {
718        Self::V1(StorageUsageV1::new(
719            id,
720            object_id,
721            size_bytes,
722            collection_timestamp,
723        ))
724    }
725
726    // Implement deserialize and serialize so writers and readers don't have to
727    // coordinate about which Serializer to use.
728    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
729        Ok(serde_json::from_slice(data)?)
730    }
731
732    pub fn serialize(&self) -> Vec<u8> {
733        serde_json::to_vec(self).expect("must serialize")
734    }
735
736    pub fn timestamp(&self) -> EpochMillis {
737        match self {
738            VersionedStorageUsage::V1(StorageUsageV1 {
739                collection_timestamp,
740                ..
741            }) => *collection_timestamp,
742        }
743    }
744
745    /// Returns a globally sortable event order. All event versions must have this
746    /// field.
747    pub fn sortable_id(&self) -> u64 {
748        match self {
749            VersionedStorageUsage::V1(usage) => usage.id,
750        }
751    }
752}
753
754#[cfg(test)]
755mod tests {
756    use crate::{EventDetails, EventType, EventV1, IdNameV1, ObjectType, VersionedEvent};
757
758    // Test all versions of events. This test hard codes bytes so that
759    // programmers are not able to change data structures here without this test
760    // failing. Instead of changing data structures, add new variants.
761    #[mz_ore::test]
762    fn test_audit_log() -> Result<(), anyhow::Error> {
763        let cases: Vec<(VersionedEvent, &'static str)> = vec![(
764            VersionedEvent::V1(EventV1::new(
765                2,
766                EventType::Drop,
767                ObjectType::ClusterReplica,
768                EventDetails::IdNameV1(IdNameV1 {
769                    id: "u1".to_string(),
770                    name: "name".into(),
771                }),
772                None,
773                2,
774            )),
775            r#"{"V1":{"id":2,"event_type":"drop","object_type":"cluster-replica","details":{"IdNameV1":{"id":"u1","name":"name"}},"user":null,"occurred_at":2}}"#,
776        )];
777
778        for (event, expected_bytes) in cases {
779            let event_bytes = serde_json::to_vec(&event).unwrap();
780            assert_eq!(
781                event_bytes,
782                expected_bytes.as_bytes(),
783                "expected bytes {}, got {}",
784                expected_bytes,
785                std::str::from_utf8(&event_bytes).unwrap(),
786            );
787        }
788
789        Ok(())
790    }
791}