Skip to main content

mz_audit_log/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Audit log data structures.
11//!
12//! The audit log is logging that is produced by user actions and consumed
13//! by users in the form of the `mz_catalog.mz_audit_events` SQL table and
14//! by the cloud management layer for billing and introspection. This crate
15//! is designed to make the production and consumption of the logs type
16//! safe. Events and their metadata are versioned and the data structures
17//! replicated here so that if the data change in some other crate, a
18//! new version here can be made. This avoids needing to poke at the data
19//! when reading it to determine what it means and should have full backward
20//! compatibility. This is its own crate so that production and consumption can
21//! be in different processes and production is not allowed to specify private
22//! data structures unknown to the reader.
23
24use mz_ore::now::EpochMillis;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28/// New version variants should be added if fields need to be added, changed, or removed.
29#[derive(
30    Clone,
31    Debug,
32    Serialize,
33    Deserialize,
34    PartialOrd,
35    PartialEq,
36    Eq,
37    Ord,
38    Hash,
39    Arbitrary
40)]
41pub enum VersionedEvent {
42    V1(EventV1),
43}
44
45impl VersionedEvent {
46    /// Create a new event. This function must always require and produce the most
47    /// recent variant of VersionedEvent. `id` must be a globally increasing,
48    /// ordered number such that sorting by it on all events yields the order
49    /// of events by users. It is insufficient to use `occurred_at` (even at
50    /// nanosecond precision) due to clock unpredictability.
51    pub fn new(
52        id: u64,
53        event_type: EventType,
54        object_type: ObjectType,
55        details: EventDetails,
56        user: Option<String>,
57        occurred_at: EpochMillis,
58    ) -> Self {
59        Self::V1(EventV1::new(
60            id,
61            event_type,
62            object_type,
63            details,
64            user,
65            occurred_at,
66        ))
67    }
68
69    // Implement deserialize and serialize so writers and readers don't have to
70    // coordinate about which Serializer to use.
71    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
72        Ok(serde_json::from_slice(data)?)
73    }
74
75    pub fn serialize(&self) -> Vec<u8> {
76        serde_json::to_vec(self).expect("must serialize")
77    }
78
79    /// Returns a globally sortable event order. All event versions must have this
80    /// field.
81    pub fn sortable_id(&self) -> u64 {
82        match self {
83            VersionedEvent::V1(ev) => ev.id,
84        }
85    }
86}
87
88#[derive(
89    Clone,
90    Debug,
91    Serialize,
92    Deserialize,
93    PartialOrd,
94    PartialEq,
95    Eq,
96    Ord,
97    Hash,
98    Arbitrary
99)]
100#[serde(rename_all = "kebab-case")]
101pub enum EventType {
102    Create,
103    Drop,
104    Alter,
105    Grant,
106    Revoke,
107    Comment,
108}
109
110impl EventType {
111    pub fn as_title_case(&self) -> &'static str {
112        match self {
113            EventType::Create => "Created",
114            EventType::Drop => "Dropped",
115            EventType::Alter => "Altered",
116            EventType::Grant => "Granted",
117            EventType::Revoke => "Revoked",
118            EventType::Comment => "Comment",
119        }
120    }
121}
122
123serde_plain::derive_display_from_serialize!(EventType);
124
125#[derive(
126    Clone,
127    Copy,
128    Debug,
129    Serialize,
130    Deserialize,
131    PartialOrd,
132    PartialEq,
133    Eq,
134    Ord,
135    Hash,
136    Arbitrary
137)]
138#[serde(rename_all = "kebab-case")]
139pub enum ObjectType {
140    Cluster,
141    ClusterReplica,
142    Connection,
143    ContinualTask,
144    Database,
145    Func,
146    Index,
147    MaterializedView,
148    NetworkPolicy,
149    Role,
150    Secret,
151    Schema,
152    Sink,
153    Source,
154    System,
155    Table,
156    Type,
157    View,
158}
159
160impl ObjectType {
161    pub fn as_title_case(&self) -> &'static str {
162        match self {
163            ObjectType::Cluster => "Cluster",
164            ObjectType::ClusterReplica => "Cluster Replica",
165            ObjectType::Connection => "Connection",
166            ObjectType::ContinualTask => "Continual Task",
167            ObjectType::Database => "Database",
168            ObjectType::Func => "Function",
169            ObjectType::Index => "Index",
170            ObjectType::MaterializedView => "Materialized View",
171            ObjectType::NetworkPolicy => "Network Policy",
172            ObjectType::Role => "Role",
173            ObjectType::Schema => "Schema",
174            ObjectType::Secret => "Secret",
175            ObjectType::Sink => "Sink",
176            ObjectType::Source => "Source",
177            ObjectType::System => "System",
178            ObjectType::Table => "Table",
179            ObjectType::Type => "Type",
180            ObjectType::View => "View",
181        }
182    }
183}
184
185serde_plain::derive_display_from_serialize!(ObjectType);
186
187#[derive(
188    Clone,
189    Debug,
190    Serialize,
191    Deserialize,
192    PartialOrd,
193    PartialEq,
194    Eq,
195    Ord,
196    Hash,
197    Arbitrary
198)]
199pub enum EventDetails {
200    #[serde(rename = "CreateComputeReplicaV1")] // historical name
201    CreateClusterReplicaV1(CreateClusterReplicaV1),
202    CreateClusterReplicaV2(CreateClusterReplicaV2),
203    CreateClusterReplicaV3(CreateClusterReplicaV3),
204    CreateClusterReplicaV4(CreateClusterReplicaV4),
205    #[serde(rename = "DropComputeReplicaV1")] // historical name
206    DropClusterReplicaV1(DropClusterReplicaV1),
207    DropClusterReplicaV2(DropClusterReplicaV2),
208    DropClusterReplicaV3(DropClusterReplicaV3),
209    CreateSourceSinkV1(CreateSourceSinkV1),
210    CreateSourceSinkV2(CreateSourceSinkV2),
211    CreateSourceSinkV3(CreateSourceSinkV3),
212    CreateSourceSinkV4(CreateSourceSinkV4),
213    CreateIndexV1(CreateIndexV1),
214    CreateMaterializedViewV1(CreateMaterializedViewV1),
215    AlterApplyReplacementV1(AlterApplyReplacementV1),
216    AlterSetClusterV1(AlterSetClusterV1),
217    AlterSourceSinkV1(AlterSourceSinkV1),
218    GrantRoleV1(GrantRoleV1),
219    GrantRoleV2(GrantRoleV2),
220    RevokeRoleV1(RevokeRoleV1),
221    RevokeRoleV2(RevokeRoleV2),
222    UpdatePrivilegeV1(UpdatePrivilegeV1),
223    AlterDefaultPrivilegeV1(AlterDefaultPrivilegeV1),
224    UpdateOwnerV1(UpdateOwnerV1),
225    IdFullNameV1(IdFullNameV1),
226    RenameClusterV1(RenameClusterV1),
227    RenameClusterReplicaV1(RenameClusterReplicaV1),
228    RenameItemV1(RenameItemV1),
229    IdNameV1(IdNameV1),
230    SchemaV1(SchemaV1),
231    SchemaV2(SchemaV2),
232    UpdateItemV1(UpdateItemV1),
233    RenameSchemaV1(RenameSchemaV1),
234    AlterRetainHistoryV1(AlterRetainHistoryV1),
235    ToNewIdV1(ToNewIdV1),
236    FromPreviousIdV1(FromPreviousIdV1),
237    SetV1(SetV1),
238    ResetAllV1,
239    RotateKeysV1(RotateKeysV1),
240    CreateRoleV1(CreateRoleV1),
241}
242
243#[derive(
244    Clone,
245    Debug,
246    Serialize,
247    Deserialize,
248    PartialOrd,
249    PartialEq,
250    Eq,
251    Ord,
252    Hash,
253    Arbitrary
254)]
255pub struct SetV1 {
256    pub name: String,
257    pub value: Option<String>,
258}
259
260#[derive(
261    Clone,
262    Debug,
263    Serialize,
264    Deserialize,
265    PartialOrd,
266    PartialEq,
267    Eq,
268    Ord,
269    Hash,
270    Arbitrary
271)]
272pub struct RotateKeysV1 {
273    pub id: String,
274    pub name: String,
275}
276
277#[derive(
278    Clone,
279    Debug,
280    Serialize,
281    Deserialize,
282    PartialOrd,
283    PartialEq,
284    Eq,
285    Ord,
286    Hash,
287    Arbitrary
288)]
289pub struct IdFullNameV1 {
290    pub id: String,
291    #[serde(flatten)]
292    pub name: FullNameV1,
293}
294
295#[derive(
296    Clone,
297    Debug,
298    Serialize,
299    Deserialize,
300    PartialOrd,
301    PartialEq,
302    Eq,
303    Ord,
304    Hash,
305    Arbitrary
306)]
307pub struct FullNameV1 {
308    pub database: String,
309    pub schema: String,
310    pub item: String,
311}
312
313#[derive(
314    Clone,
315    Debug,
316    Serialize,
317    Deserialize,
318    PartialOrd,
319    PartialEq,
320    Eq,
321    Ord,
322    Hash,
323    Arbitrary
324)]
325pub struct IdNameV1 {
326    pub id: String,
327    pub name: String,
328}
329
330#[derive(
331    Clone,
332    Debug,
333    Serialize,
334    Deserialize,
335    PartialOrd,
336    PartialEq,
337    Eq,
338    Ord,
339    Hash,
340    Arbitrary
341)]
342pub struct CreateRoleV1 {
343    pub id: String,
344    pub name: String,
345    pub auto_provision_source: Option<String>,
346}
347
348#[derive(
349    Clone,
350    Debug,
351    Serialize,
352    Deserialize,
353    PartialOrd,
354    PartialEq,
355    Eq,
356    Ord,
357    Hash,
358    Arbitrary
359)]
360pub struct RenameItemV1 {
361    pub id: String,
362    pub old_name: FullNameV1,
363    pub new_name: FullNameV1,
364}
365
366#[derive(
367    Clone,
368    Debug,
369    Serialize,
370    Deserialize,
371    PartialOrd,
372    PartialEq,
373    Eq,
374    Ord,
375    Hash,
376    Arbitrary
377)]
378pub struct RenameClusterV1 {
379    pub id: String,
380    pub old_name: String,
381    pub new_name: String,
382}
383
384#[derive(
385    Clone,
386    Debug,
387    Serialize,
388    Deserialize,
389    PartialOrd,
390    PartialEq,
391    Eq,
392    Ord,
393    Hash,
394    Arbitrary
395)]
396pub struct RenameClusterReplicaV1 {
397    pub cluster_id: String,
398    pub replica_id: String,
399    pub old_name: String,
400    pub new_name: String,
401}
402
403#[derive(
404    Clone,
405    Debug,
406    Serialize,
407    Deserialize,
408    PartialOrd,
409    PartialEq,
410    Eq,
411    Ord,
412    Hash,
413    Arbitrary
414)]
415pub struct DropClusterReplicaV1 {
416    pub cluster_id: String,
417    pub cluster_name: String,
418    // Events that predate v0.32.0 will not have this field set.
419    #[serde(skip_serializing_if = "Option::is_none")]
420    pub replica_id: Option<String>,
421    pub replica_name: String,
422}
423
424#[derive(
425    Clone,
426    Debug,
427    Serialize,
428    Deserialize,
429    PartialOrd,
430    PartialEq,
431    Eq,
432    Ord,
433    Hash,
434    Arbitrary
435)]
436pub struct DropClusterReplicaV2 {
437    pub cluster_id: String,
438    pub cluster_name: String,
439    pub replica_id: Option<String>,
440    pub replica_name: String,
441    pub reason: CreateOrDropClusterReplicaReasonV1,
442    #[serde(skip_serializing_if = "Option::is_none")]
443    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
444}
445
446#[derive(
447    Clone,
448    Debug,
449    Serialize,
450    Deserialize,
451    PartialOrd,
452    PartialEq,
453    Eq,
454    Ord,
455    Hash,
456    Arbitrary
457)]
458pub struct DropClusterReplicaV3 {
459    pub cluster_id: String,
460    pub cluster_name: String,
461    pub replica_id: Option<String>,
462    pub replica_name: String,
463    pub reason: CreateOrDropClusterReplicaReasonV1,
464    #[serde(skip_serializing_if = "Option::is_none")]
465    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
466}
467
468#[derive(
469    Clone,
470    Debug,
471    Serialize,
472    Deserialize,
473    PartialOrd,
474    PartialEq,
475    Eq,
476    Ord,
477    Hash,
478    Arbitrary
479)]
480pub struct CreateClusterReplicaV1 {
481    pub cluster_id: String,
482    pub cluster_name: String,
483    // Events that predate v0.32.0 will not have this field set.
484    #[serde(skip_serializing_if = "Option::is_none")]
485    pub replica_id: Option<String>,
486    pub replica_name: String,
487    pub logical_size: String,
488    pub disk: bool,
489    pub billed_as: Option<String>,
490    pub internal: bool,
491}
492
493#[derive(
494    Clone,
495    Debug,
496    Serialize,
497    Deserialize,
498    PartialOrd,
499    PartialEq,
500    Eq,
501    Ord,
502    Hash,
503    Arbitrary
504)]
505pub struct CreateClusterReplicaV2 {
506    pub cluster_id: String,
507    pub cluster_name: String,
508    pub replica_id: Option<String>,
509    pub replica_name: String,
510    pub logical_size: String,
511    pub disk: bool,
512    pub billed_as: Option<String>,
513    pub internal: bool,
514    pub reason: CreateOrDropClusterReplicaReasonV1,
515    #[serde(skip_serializing_if = "Option::is_none")]
516    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
517}
518
519#[derive(
520    Clone,
521    Debug,
522    Serialize,
523    Deserialize,
524    PartialOrd,
525    PartialEq,
526    Eq,
527    Ord,
528    Hash,
529    Arbitrary
530)]
531pub struct CreateClusterReplicaV3 {
532    pub cluster_id: String,
533    pub cluster_name: String,
534    pub replica_id: Option<String>,
535    pub replica_name: String,
536    pub logical_size: String,
537    pub disk: bool,
538    pub billed_as: Option<String>,
539    pub internal: bool,
540    pub reason: CreateOrDropClusterReplicaReasonV1,
541    #[serde(skip_serializing_if = "Option::is_none")]
542    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
543}
544
545#[derive(
546    Clone,
547    Debug,
548    Serialize,
549    Deserialize,
550    PartialOrd,
551    PartialEq,
552    Eq,
553    Ord,
554    Hash,
555    Arbitrary
556)]
557pub struct CreateClusterReplicaV4 {
558    pub cluster_id: String,
559    pub cluster_name: String,
560    pub replica_id: Option<String>,
561    pub replica_name: String,
562    pub logical_size: String,
563    pub billed_as: Option<String>,
564    pub internal: bool,
565    pub reason: CreateOrDropClusterReplicaReasonV1,
566    #[serde(skip_serializing_if = "Option::is_none")]
567    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
568}
569
570#[derive(
571    Clone,
572    Debug,
573    Serialize,
574    Deserialize,
575    PartialOrd,
576    PartialEq,
577    Eq,
578    Ord,
579    Hash,
580    Arbitrary
581)]
582#[serde(rename_all = "kebab-case")]
583pub enum CreateOrDropClusterReplicaReasonV1 {
584    Manual,
585    Schedule,
586    System,
587}
588
589/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
590/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
591/// can be settings of the policy as well as other information about the state of the system.)
592#[derive(
593    Clone,
594    Debug,
595    Serialize,
596    Deserialize,
597    PartialOrd,
598    PartialEq,
599    Eq,
600    Ord,
601    Hash,
602    Arbitrary
603)]
604pub struct SchedulingDecisionsWithReasonsV1 {
605    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
606    pub on_refresh: RefreshDecisionWithReasonV1,
607}
608
609/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
610/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
611/// can be settings of the policy as well as other information about the state of the system.)
612#[derive(
613    Clone,
614    Debug,
615    Serialize,
616    Deserialize,
617    PartialOrd,
618    PartialEq,
619    Eq,
620    Ord,
621    Hash,
622    Arbitrary
623)]
624pub struct SchedulingDecisionsWithReasonsV2 {
625    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
626    pub on_refresh: RefreshDecisionWithReasonV2,
627}
628
629#[derive(
630    Clone,
631    Debug,
632    Serialize,
633    Deserialize,
634    PartialOrd,
635    PartialEq,
636    Eq,
637    Ord,
638    Hash,
639    Arbitrary
640)]
641pub struct RefreshDecisionWithReasonV1 {
642    pub decision: SchedulingDecisionV1,
643    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
644    /// time estimate).
645    pub objects_needing_refresh: Vec<String>,
646    /// The HYDRATION TIME ESTIMATE setting of the cluster.
647    pub hydration_time_estimate: String,
648}
649
650#[derive(
651    Clone,
652    Debug,
653    Serialize,
654    Deserialize,
655    PartialOrd,
656    PartialEq,
657    Eq,
658    Ord,
659    Hash,
660    Arbitrary
661)]
662pub struct RefreshDecisionWithReasonV2 {
663    pub decision: SchedulingDecisionV1,
664    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
665    /// time estimate), and therefore should keep the cluster On.
666    pub objects_needing_refresh: Vec<String>,
667    /// Objects for which we estimate that they currently need Persist compaction, and therefore
668    /// should keep the cluster On.
669    pub objects_needing_compaction: Vec<String>,
670    /// The HYDRATION TIME ESTIMATE setting of the cluster.
671    pub hydration_time_estimate: String,
672}
673
674#[derive(
675    Clone,
676    Debug,
677    Serialize,
678    Deserialize,
679    PartialOrd,
680    PartialEq,
681    Eq,
682    Ord,
683    Hash,
684    Arbitrary
685)]
686#[serde(rename_all = "kebab-case")]
687pub enum SchedulingDecisionV1 {
688    On,
689    Off,
690}
691
692impl From<bool> for SchedulingDecisionV1 {
693    fn from(value: bool) -> Self {
694        match value {
695            true => SchedulingDecisionV1::On,
696            false => SchedulingDecisionV1::Off,
697        }
698    }
699}
700
701#[derive(
702    Clone,
703    Debug,
704    Serialize,
705    Deserialize,
706    PartialOrd,
707    PartialEq,
708    Eq,
709    Ord,
710    Hash,
711    Arbitrary
712)]
713pub struct CreateSourceSinkV1 {
714    pub id: String,
715    #[serde(flatten)]
716    pub name: FullNameV1,
717    pub size: Option<String>,
718}
719
720#[derive(
721    Clone,
722    Debug,
723    Serialize,
724    Deserialize,
725    PartialOrd,
726    PartialEq,
727    Eq,
728    Ord,
729    Hash,
730    Arbitrary
731)]
732pub struct CreateSourceSinkV2 {
733    pub id: String,
734    #[serde(flatten)]
735    pub name: FullNameV1,
736    pub size: Option<String>,
737    #[serde(rename = "type")]
738    pub external_type: String,
739}
740
741#[derive(
742    Clone,
743    Debug,
744    Serialize,
745    Deserialize,
746    PartialOrd,
747    PartialEq,
748    Eq,
749    Ord,
750    Hash,
751    Arbitrary
752)]
753pub struct CreateSourceSinkV3 {
754    pub id: String,
755    #[serde(flatten)]
756    pub name: FullNameV1,
757    #[serde(rename = "type")]
758    pub external_type: String,
759}
760
761#[derive(
762    Clone,
763    Debug,
764    Serialize,
765    Deserialize,
766    PartialOrd,
767    PartialEq,
768    Eq,
769    Ord,
770    Hash,
771    Arbitrary
772)]
773pub struct CreateSourceSinkV4 {
774    pub id: String,
775    pub cluster_id: Option<String>,
776    #[serde(flatten)]
777    pub name: FullNameV1,
778    #[serde(rename = "type")]
779    pub external_type: String,
780}
781
782#[derive(
783    Clone,
784    Debug,
785    Serialize,
786    Deserialize,
787    PartialOrd,
788    PartialEq,
789    Eq,
790    Ord,
791    Hash,
792    Arbitrary
793)]
794pub struct CreateIndexV1 {
795    pub id: String,
796    pub cluster_id: String,
797    #[serde(flatten)]
798    pub name: FullNameV1,
799}
800
801#[derive(
802    Clone,
803    Debug,
804    Serialize,
805    Deserialize,
806    PartialOrd,
807    PartialEq,
808    Eq,
809    Ord,
810    Hash,
811    Arbitrary
812)]
813pub struct CreateMaterializedViewV1 {
814    pub id: String,
815    pub cluster_id: String,
816    #[serde(flatten)]
817    pub name: FullNameV1,
818    #[serde(skip_serializing_if = "Option::is_none")]
819    pub replacement_target_id: Option<String>,
820}
821
822#[derive(
823    Clone,
824    Debug,
825    Serialize,
826    Deserialize,
827    PartialOrd,
828    PartialEq,
829    Eq,
830    Ord,
831    Hash,
832    Arbitrary
833)]
834pub struct AlterApplyReplacementV1 {
835    #[serde(flatten)]
836    pub target: IdFullNameV1,
837    pub replacement: IdFullNameV1,
838}
839
840#[derive(
841    Clone,
842    Debug,
843    Serialize,
844    Deserialize,
845    PartialOrd,
846    PartialEq,
847    Eq,
848    Ord,
849    Hash,
850    Arbitrary
851)]
852pub struct AlterSourceSinkV1 {
853    pub id: String,
854    #[serde(flatten)]
855    pub name: FullNameV1,
856    pub old_size: Option<String>,
857    pub new_size: Option<String>,
858}
859
860#[derive(
861    Clone,
862    Debug,
863    Serialize,
864    Deserialize,
865    PartialOrd,
866    PartialEq,
867    Eq,
868    Ord,
869    Hash,
870    Arbitrary
871)]
872pub struct AlterSetClusterV1 {
873    pub id: String,
874    #[serde(flatten)]
875    pub name: FullNameV1,
876    pub old_cluster_id: String,
877    pub new_cluster_id: String,
878}
879
880#[derive(
881    Clone,
882    Debug,
883    Serialize,
884    Deserialize,
885    PartialOrd,
886    PartialEq,
887    Eq,
888    Ord,
889    Hash,
890    Arbitrary
891)]
892pub struct GrantRoleV1 {
893    pub role_id: String,
894    pub member_id: String,
895    pub grantor_id: String,
896}
897
898#[derive(
899    Clone,
900    Debug,
901    Serialize,
902    Deserialize,
903    PartialOrd,
904    PartialEq,
905    Eq,
906    Ord,
907    Hash,
908    Arbitrary
909)]
910pub struct GrantRoleV2 {
911    pub role_id: String,
912    pub member_id: String,
913    pub grantor_id: String,
914    pub executed_by: String,
915}
916
917#[derive(
918    Clone,
919    Debug,
920    Serialize,
921    Deserialize,
922    PartialOrd,
923    PartialEq,
924    Eq,
925    Ord,
926    Hash,
927    Arbitrary
928)]
929pub struct RevokeRoleV1 {
930    pub role_id: String,
931    pub member_id: String,
932}
933
934#[derive(
935    Clone,
936    Debug,
937    Serialize,
938    Deserialize,
939    PartialOrd,
940    PartialEq,
941    Eq,
942    Ord,
943    Hash,
944    Arbitrary
945)]
946pub struct RevokeRoleV2 {
947    pub role_id: String,
948    pub member_id: String,
949    pub grantor_id: String,
950    pub executed_by: String,
951}
952
953#[derive(
954    Clone,
955    Debug,
956    Serialize,
957    Deserialize,
958    PartialOrd,
959    PartialEq,
960    Eq,
961    Ord,
962    Hash,
963    Arbitrary
964)]
965pub struct UpdatePrivilegeV1 {
966    pub object_id: String,
967    pub grantee_id: String,
968    pub grantor_id: String,
969    pub privileges: String,
970}
971
972#[derive(
973    Clone,
974    Debug,
975    Serialize,
976    Deserialize,
977    PartialOrd,
978    PartialEq,
979    Eq,
980    Ord,
981    Hash,
982    Arbitrary
983)]
984pub struct AlterDefaultPrivilegeV1 {
985    pub role_id: String,
986    pub database_id: Option<String>,
987    pub schema_id: Option<String>,
988    pub grantee_id: String,
989    pub privileges: String,
990}
991
992#[derive(
993    Clone,
994    Debug,
995    Serialize,
996    Deserialize,
997    PartialOrd,
998    PartialEq,
999    Eq,
1000    Ord,
1001    Hash,
1002    Arbitrary
1003)]
1004pub struct UpdateOwnerV1 {
1005    pub object_id: String,
1006    pub old_owner_id: String,
1007    pub new_owner_id: String,
1008}
1009
1010#[derive(
1011    Clone,
1012    Debug,
1013    Serialize,
1014    Deserialize,
1015    PartialOrd,
1016    PartialEq,
1017    Eq,
1018    Ord,
1019    Hash,
1020    Arbitrary
1021)]
1022pub struct SchemaV1 {
1023    pub id: String,
1024    pub name: String,
1025    pub database_name: String,
1026}
1027
1028#[derive(
1029    Clone,
1030    Debug,
1031    Serialize,
1032    Deserialize,
1033    PartialOrd,
1034    PartialEq,
1035    Eq,
1036    Ord,
1037    Hash,
1038    Arbitrary
1039)]
1040pub struct SchemaV2 {
1041    pub id: String,
1042    pub name: String,
1043    pub database_name: Option<String>,
1044}
1045
1046#[derive(
1047    Clone,
1048    Debug,
1049    Serialize,
1050    Deserialize,
1051    PartialOrd,
1052    PartialEq,
1053    Eq,
1054    Ord,
1055    Hash,
1056    Arbitrary
1057)]
1058pub struct RenameSchemaV1 {
1059    pub id: String,
1060    pub database_name: Option<String>,
1061    pub old_name: String,
1062    pub new_name: String,
1063}
1064
1065#[derive(
1066    Clone,
1067    Debug,
1068    Serialize,
1069    Deserialize,
1070    PartialOrd,
1071    PartialEq,
1072    Eq,
1073    Ord,
1074    Hash,
1075    Arbitrary
1076)]
1077pub struct AlterRetainHistoryV1 {
1078    pub id: String,
1079    pub old_history: Option<String>,
1080    pub new_history: Option<String>,
1081}
1082
1083#[derive(
1084    Clone,
1085    Debug,
1086    Serialize,
1087    Deserialize,
1088    PartialOrd,
1089    PartialEq,
1090    Eq,
1091    Ord,
1092    Hash,
1093    Arbitrary
1094)]
1095pub struct UpdateItemV1 {
1096    pub id: String,
1097    #[serde(flatten)]
1098    pub name: FullNameV1,
1099}
1100
1101#[derive(
1102    Clone,
1103    Debug,
1104    Serialize,
1105    Deserialize,
1106    PartialOrd,
1107    PartialEq,
1108    Eq,
1109    Ord,
1110    Hash,
1111    Arbitrary
1112)]
1113pub struct ToNewIdV1 {
1114    pub id: String,
1115    pub new_id: String,
1116}
1117
1118#[derive(
1119    Clone,
1120    Debug,
1121    Serialize,
1122    Deserialize,
1123    PartialOrd,
1124    PartialEq,
1125    Eq,
1126    Ord,
1127    Hash,
1128    Arbitrary
1129)]
1130pub struct FromPreviousIdV1 {
1131    pub id: String,
1132    pub previous_id: String,
1133}
1134
1135impl EventDetails {
1136    pub fn as_json(&self) -> serde_json::Value {
1137        match self {
1138            EventDetails::CreateClusterReplicaV1(v) => {
1139                serde_json::to_value(v).expect("must serialize")
1140            }
1141            EventDetails::CreateClusterReplicaV2(v) => {
1142                serde_json::to_value(v).expect("must serialize")
1143            }
1144            EventDetails::CreateClusterReplicaV3(v) => {
1145                serde_json::to_value(v).expect("must serialize")
1146            }
1147            EventDetails::CreateClusterReplicaV4(v) => {
1148                serde_json::to_value(v).expect("must serialize")
1149            }
1150            EventDetails::DropClusterReplicaV1(v) => {
1151                serde_json::to_value(v).expect("must serialize")
1152            }
1153            EventDetails::DropClusterReplicaV2(v) => {
1154                serde_json::to_value(v).expect("must serialize")
1155            }
1156            EventDetails::DropClusterReplicaV3(v) => {
1157                serde_json::to_value(v).expect("must serialize")
1158            }
1159            EventDetails::IdFullNameV1(v) => serde_json::to_value(v).expect("must serialize"),
1160            EventDetails::RenameClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
1161            EventDetails::RenameClusterReplicaV1(v) => {
1162                serde_json::to_value(v).expect("must serialize")
1163            }
1164            EventDetails::RenameItemV1(v) => serde_json::to_value(v).expect("must serialize"),
1165            EventDetails::IdNameV1(v) => serde_json::to_value(v).expect("must serialize"),
1166            EventDetails::SchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
1167            EventDetails::SchemaV2(v) => serde_json::to_value(v).expect("must serialize"),
1168            EventDetails::RenameSchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
1169            EventDetails::CreateSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
1170            EventDetails::CreateSourceSinkV2(v) => serde_json::to_value(v).expect("must serialize"),
1171            EventDetails::CreateSourceSinkV3(v) => serde_json::to_value(v).expect("must serialize"),
1172            EventDetails::CreateSourceSinkV4(v) => serde_json::to_value(v).expect("must serialize"),
1173            EventDetails::CreateIndexV1(v) => serde_json::to_value(v).expect("must serialize"),
1174            EventDetails::CreateMaterializedViewV1(v) => {
1175                serde_json::to_value(v).expect("must serialize")
1176            }
1177            EventDetails::AlterApplyReplacementV1(v) => {
1178                serde_json::to_value(v).expect("must serialize")
1179            }
1180            EventDetails::AlterSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
1181            EventDetails::AlterSetClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
1182            EventDetails::GrantRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1183            EventDetails::GrantRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
1184            EventDetails::RevokeRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1185            EventDetails::RevokeRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
1186            EventDetails::UpdatePrivilegeV1(v) => serde_json::to_value(v).expect("must serialize"),
1187            EventDetails::AlterDefaultPrivilegeV1(v) => {
1188                serde_json::to_value(v).expect("must serialize")
1189            }
1190            EventDetails::UpdateOwnerV1(v) => serde_json::to_value(v).expect("must serialize"),
1191            EventDetails::UpdateItemV1(v) => serde_json::to_value(v).expect("must serialize"),
1192            EventDetails::AlterRetainHistoryV1(v) => {
1193                serde_json::to_value(v).expect("must serialize")
1194            }
1195            EventDetails::ToNewIdV1(v) => serde_json::to_value(v).expect("must serialize"),
1196            EventDetails::FromPreviousIdV1(v) => serde_json::to_value(v).expect("must serialize"),
1197            EventDetails::SetV1(v) => serde_json::to_value(v).expect("must serialize"),
1198            EventDetails::ResetAllV1 => serde_json::Value::Null,
1199            EventDetails::RotateKeysV1(v) => serde_json::to_value(v).expect("must serialize"),
1200            EventDetails::CreateRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1201        }
1202    }
1203}
1204
1205#[derive(
1206    Clone,
1207    Debug,
1208    Serialize,
1209    Deserialize,
1210    PartialOrd,
1211    PartialEq,
1212    Eq,
1213    Ord,
1214    Hash,
1215    Arbitrary
1216)]
1217pub struct EventV1 {
1218    pub id: u64,
1219    pub event_type: EventType,
1220    pub object_type: ObjectType,
1221    pub details: EventDetails,
1222    pub user: Option<String>,
1223    pub occurred_at: EpochMillis,
1224}
1225
1226impl EventV1 {
1227    fn new(
1228        id: u64,
1229        event_type: EventType,
1230        object_type: ObjectType,
1231        details: EventDetails,
1232        user: Option<String>,
1233        occurred_at: EpochMillis,
1234    ) -> EventV1 {
1235        EventV1 {
1236            id,
1237            event_type,
1238            object_type,
1239            details,
1240            user,
1241            occurred_at,
1242        }
1243    }
1244}
1245
1246#[derive(
1247    Clone,
1248    Debug,
1249    Serialize,
1250    Deserialize,
1251    PartialOrd,
1252    PartialEq,
1253    Eq,
1254    Ord,
1255    Hash,
1256    Arbitrary
1257)]
1258pub struct StorageUsageV1 {
1259    pub id: u64,
1260    pub shard_id: Option<String>,
1261    pub size_bytes: u64,
1262    pub collection_timestamp: EpochMillis,
1263}
1264
1265impl StorageUsageV1 {
1266    pub fn new(
1267        id: u64,
1268        shard_id: Option<String>,
1269        size_bytes: u64,
1270        collection_timestamp: EpochMillis,
1271    ) -> StorageUsageV1 {
1272        StorageUsageV1 {
1273            id,
1274            shard_id,
1275            size_bytes,
1276            collection_timestamp,
1277        }
1278    }
1279}
1280
1281/// Describes the environment's storage usage at a point in time.
1282///
1283/// This type is persisted in the catalog across restarts, so any updates to the
1284/// schema will require a new version.
1285#[derive(
1286    Clone,
1287    Debug,
1288    Serialize,
1289    Deserialize,
1290    PartialOrd,
1291    PartialEq,
1292    Eq,
1293    Ord,
1294    Hash,
1295    Arbitrary
1296)]
1297pub enum VersionedStorageUsage {
1298    V1(StorageUsageV1),
1299}
1300
1301impl VersionedStorageUsage {
1302    /// Create a new metric snapshot.
1303    /// This function must always require and produce the most
1304    /// recent variant of VersionedStorageMetrics.
1305    pub fn new(
1306        id: u64,
1307        object_id: Option<String>,
1308        size_bytes: u64,
1309        collection_timestamp: EpochMillis,
1310    ) -> Self {
1311        Self::V1(StorageUsageV1::new(
1312            id,
1313            object_id,
1314            size_bytes,
1315            collection_timestamp,
1316        ))
1317    }
1318
1319    // Implement deserialize and serialize so writers and readers don't have to
1320    // coordinate about which Serializer to use.
1321    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
1322        Ok(serde_json::from_slice(data)?)
1323    }
1324
1325    pub fn serialize(&self) -> Vec<u8> {
1326        serde_json::to_vec(self).expect("must serialize")
1327    }
1328
1329    pub fn timestamp(&self) -> EpochMillis {
1330        match self {
1331            VersionedStorageUsage::V1(StorageUsageV1 {
1332                collection_timestamp,
1333                ..
1334            }) => *collection_timestamp,
1335        }
1336    }
1337
1338    /// Returns a globally sortable event order. All event versions must have this
1339    /// field.
1340    pub fn sortable_id(&self) -> u64 {
1341        match self {
1342            VersionedStorageUsage::V1(usage) => usage.id,
1343        }
1344    }
1345}
1346
1347#[cfg(test)]
1348mod tests {
1349    use crate::{EventDetails, EventType, EventV1, IdNameV1, ObjectType, VersionedEvent};
1350
1351    // Test all versions of events. This test hard codes bytes so that
1352    // programmers are not able to change data structures here without this test
1353    // failing. Instead of changing data structures, add new variants.
1354    #[mz_ore::test]
1355    fn test_audit_log() -> Result<(), anyhow::Error> {
1356        let cases: Vec<(VersionedEvent, &'static str)> = vec![(
1357            VersionedEvent::V1(EventV1::new(
1358                2,
1359                EventType::Drop,
1360                ObjectType::ClusterReplica,
1361                EventDetails::IdNameV1(IdNameV1 {
1362                    id: "u1".to_string(),
1363                    name: "name".into(),
1364                }),
1365                None,
1366                2,
1367            )),
1368            r#"{"V1":{"id":2,"event_type":"drop","object_type":"cluster-replica","details":{"IdNameV1":{"id":"u1","name":"name"}},"user":null,"occurred_at":2}}"#,
1369        )];
1370
1371        for (event, expected_bytes) in cases {
1372            let event_bytes = serde_json::to_vec(&event).unwrap();
1373            assert_eq!(
1374                event_bytes,
1375                expected_bytes.as_bytes(),
1376                "expected bytes {}, got {}",
1377                expected_bytes,
1378                std::str::from_utf8(&event_bytes).unwrap(),
1379            );
1380        }
1381
1382        Ok(())
1383    }
1384}