Skip to main content

mz_audit_log/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Audit log data structures.
11//!
12//! The audit log is logging that is produced by user actions and consumed
13//! by users in the form of the `mz_catalog.mz_audit_events` SQL table and
14//! by the cloud management layer for billing and introspection. This crate
15//! is designed to make the production and consumption of the logs type
16//! safe. Events and their metadata are versioned and the data structures
17//! replicated here so that if the data change in some other crate, a
18//! new version here can be made. This avoids needing to poke at the data
19//! when reading it to determine what it means and should have full backward
20//! compatibility. This is its own crate so that production and consumption can
21//! be in different processes and production is not allowed to specify private
22//! data structures unknown to the reader.
23
24use mz_ore::now::EpochMillis;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28/// New version variants should be added if fields need to be added, changed, or removed.
29#[derive(
30    Clone,
31    Debug,
32    Serialize,
33    Deserialize,
34    PartialOrd,
35    PartialEq,
36    Eq,
37    Ord,
38    Hash,
39    Arbitrary
40)]
41pub enum VersionedEvent {
42    V1(EventV1),
43}
44
45impl VersionedEvent {
46    /// Create a new event. This function must always require and produce the most
47    /// recent variant of VersionedEvent. `id` must be a globally increasing,
48    /// ordered number such that sorting by it on all events yields the order
49    /// of events by users. It is insufficient to use `occurred_at` (even at
50    /// nanosecond precision) due to clock unpredictability.
51    pub fn new(
52        id: u64,
53        event_type: EventType,
54        object_type: ObjectType,
55        details: EventDetails,
56        user: Option<String>,
57        occurred_at: EpochMillis,
58    ) -> Self {
59        Self::V1(EventV1::new(
60            id,
61            event_type,
62            object_type,
63            details,
64            user,
65            occurred_at,
66        ))
67    }
68
69    // Implement deserialize and serialize so writers and readers don't have to
70    // coordinate about which Serializer to use.
71    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
72        Ok(serde_json::from_slice(data)?)
73    }
74
75    pub fn serialize(&self) -> Vec<u8> {
76        serde_json::to_vec(self).expect("must serialize")
77    }
78
79    /// Returns a globally sortable event order. All event versions must have this
80    /// field.
81    pub fn sortable_id(&self) -> u64 {
82        match self {
83            VersionedEvent::V1(ev) => ev.id,
84        }
85    }
86}
87
88#[derive(
89    Clone,
90    Debug,
91    Serialize,
92    Deserialize,
93    PartialOrd,
94    PartialEq,
95    Eq,
96    Ord,
97    Hash,
98    Arbitrary
99)]
100#[serde(rename_all = "kebab-case")]
101pub enum EventType {
102    Create,
103    Drop,
104    Alter,
105    Grant,
106    Revoke,
107    Comment,
108}
109
110impl EventType {
111    pub fn as_title_case(&self) -> &'static str {
112        match self {
113            EventType::Create => "Created",
114            EventType::Drop => "Dropped",
115            EventType::Alter => "Altered",
116            EventType::Grant => "Granted",
117            EventType::Revoke => "Revoked",
118            EventType::Comment => "Comment",
119        }
120    }
121}
122
123serde_plain::derive_display_from_serialize!(EventType);
124
125#[derive(
126    Clone,
127    Copy,
128    Debug,
129    Serialize,
130    Deserialize,
131    PartialOrd,
132    PartialEq,
133    Eq,
134    Ord,
135    Hash,
136    Arbitrary
137)]
138#[serde(rename_all = "kebab-case")]
139pub enum ObjectType {
140    Cluster,
141    ClusterReplica,
142    Connection,
143    ContinualTask,
144    Database,
145    Func,
146    Index,
147    MaterializedView,
148    NetworkPolicy,
149    Role,
150    Secret,
151    Schema,
152    Sink,
153    Source,
154    System,
155    Table,
156    Type,
157    View,
158}
159
160impl ObjectType {
161    pub fn as_title_case(&self) -> &'static str {
162        match self {
163            ObjectType::Cluster => "Cluster",
164            ObjectType::ClusterReplica => "Cluster Replica",
165            ObjectType::Connection => "Connection",
166            ObjectType::ContinualTask => "Continual Task",
167            ObjectType::Database => "Database",
168            ObjectType::Func => "Function",
169            ObjectType::Index => "Index",
170            ObjectType::MaterializedView => "Materialized View",
171            ObjectType::NetworkPolicy => "Network Policy",
172            ObjectType::Role => "Role",
173            ObjectType::Schema => "Schema",
174            ObjectType::Secret => "Secret",
175            ObjectType::Sink => "Sink",
176            ObjectType::Source => "Source",
177            ObjectType::System => "System",
178            ObjectType::Table => "Table",
179            ObjectType::Type => "Type",
180            ObjectType::View => "View",
181        }
182    }
183}
184
185serde_plain::derive_display_from_serialize!(ObjectType);
186
187#[derive(
188    Clone,
189    Debug,
190    Serialize,
191    Deserialize,
192    PartialOrd,
193    PartialEq,
194    Eq,
195    Ord,
196    Hash,
197    Arbitrary
198)]
199pub enum EventDetails {
200    #[serde(rename = "CreateComputeReplicaV1")] // historical name
201    CreateClusterReplicaV1(CreateClusterReplicaV1),
202    CreateClusterReplicaV2(CreateClusterReplicaV2),
203    CreateClusterReplicaV3(CreateClusterReplicaV3),
204    CreateClusterReplicaV4(CreateClusterReplicaV4),
205    #[serde(rename = "DropComputeReplicaV1")] // historical name
206    DropClusterReplicaV1(DropClusterReplicaV1),
207    DropClusterReplicaV2(DropClusterReplicaV2),
208    DropClusterReplicaV3(DropClusterReplicaV3),
209    CreateSourceSinkV1(CreateSourceSinkV1),
210    CreateSourceSinkV2(CreateSourceSinkV2),
211    CreateSourceSinkV3(CreateSourceSinkV3),
212    CreateSourceSinkV4(CreateSourceSinkV4),
213    CreateIndexV1(CreateIndexV1),
214    CreateMaterializedViewV1(CreateMaterializedViewV1),
215    AlterApplyReplacementV1(AlterApplyReplacementV1),
216    AlterSetClusterV1(AlterSetClusterV1),
217    AlterSourceSinkV1(AlterSourceSinkV1),
218    GrantRoleV1(GrantRoleV1),
219    GrantRoleV2(GrantRoleV2),
220    RevokeRoleV1(RevokeRoleV1),
221    RevokeRoleV2(RevokeRoleV2),
222    UpdatePrivilegeV1(UpdatePrivilegeV1),
223    AlterDefaultPrivilegeV1(AlterDefaultPrivilegeV1),
224    UpdateOwnerV1(UpdateOwnerV1),
225    IdFullNameV1(IdFullNameV1),
226    RenameClusterV1(RenameClusterV1),
227    RenameClusterReplicaV1(RenameClusterReplicaV1),
228    RenameItemV1(RenameItemV1),
229    IdNameV1(IdNameV1),
230    SchemaV1(SchemaV1),
231    SchemaV2(SchemaV2),
232    UpdateItemV1(UpdateItemV1),
233    RenameSchemaV1(RenameSchemaV1),
234    AlterRetainHistoryV1(AlterRetainHistoryV1),
235    ToNewIdV1(ToNewIdV1),
236    FromPreviousIdV1(FromPreviousIdV1),
237    SetV1(SetV1),
238    ResetAllV1,
239    RotateKeysV1(RotateKeysV1),
240}
241
242#[derive(
243    Clone,
244    Debug,
245    Serialize,
246    Deserialize,
247    PartialOrd,
248    PartialEq,
249    Eq,
250    Ord,
251    Hash,
252    Arbitrary
253)]
254pub struct SetV1 {
255    pub name: String,
256    pub value: Option<String>,
257}
258
259#[derive(
260    Clone,
261    Debug,
262    Serialize,
263    Deserialize,
264    PartialOrd,
265    PartialEq,
266    Eq,
267    Ord,
268    Hash,
269    Arbitrary
270)]
271pub struct RotateKeysV1 {
272    pub id: String,
273    pub name: String,
274}
275
276#[derive(
277    Clone,
278    Debug,
279    Serialize,
280    Deserialize,
281    PartialOrd,
282    PartialEq,
283    Eq,
284    Ord,
285    Hash,
286    Arbitrary
287)]
288pub struct IdFullNameV1 {
289    pub id: String,
290    #[serde(flatten)]
291    pub name: FullNameV1,
292}
293
294#[derive(
295    Clone,
296    Debug,
297    Serialize,
298    Deserialize,
299    PartialOrd,
300    PartialEq,
301    Eq,
302    Ord,
303    Hash,
304    Arbitrary
305)]
306pub struct FullNameV1 {
307    pub database: String,
308    pub schema: String,
309    pub item: String,
310}
311
312#[derive(
313    Clone,
314    Debug,
315    Serialize,
316    Deserialize,
317    PartialOrd,
318    PartialEq,
319    Eq,
320    Ord,
321    Hash,
322    Arbitrary
323)]
324pub struct IdNameV1 {
325    pub id: String,
326    pub name: String,
327}
328
329#[derive(
330    Clone,
331    Debug,
332    Serialize,
333    Deserialize,
334    PartialOrd,
335    PartialEq,
336    Eq,
337    Ord,
338    Hash,
339    Arbitrary
340)]
341pub struct RenameItemV1 {
342    pub id: String,
343    pub old_name: FullNameV1,
344    pub new_name: FullNameV1,
345}
346
347#[derive(
348    Clone,
349    Debug,
350    Serialize,
351    Deserialize,
352    PartialOrd,
353    PartialEq,
354    Eq,
355    Ord,
356    Hash,
357    Arbitrary
358)]
359pub struct RenameClusterV1 {
360    pub id: String,
361    pub old_name: String,
362    pub new_name: String,
363}
364
365#[derive(
366    Clone,
367    Debug,
368    Serialize,
369    Deserialize,
370    PartialOrd,
371    PartialEq,
372    Eq,
373    Ord,
374    Hash,
375    Arbitrary
376)]
377pub struct RenameClusterReplicaV1 {
378    pub cluster_id: String,
379    pub replica_id: String,
380    pub old_name: String,
381    pub new_name: String,
382}
383
384#[derive(
385    Clone,
386    Debug,
387    Serialize,
388    Deserialize,
389    PartialOrd,
390    PartialEq,
391    Eq,
392    Ord,
393    Hash,
394    Arbitrary
395)]
396pub struct DropClusterReplicaV1 {
397    pub cluster_id: String,
398    pub cluster_name: String,
399    // Events that predate v0.32.0 will not have this field set.
400    #[serde(skip_serializing_if = "Option::is_none")]
401    pub replica_id: Option<String>,
402    pub replica_name: String,
403}
404
405#[derive(
406    Clone,
407    Debug,
408    Serialize,
409    Deserialize,
410    PartialOrd,
411    PartialEq,
412    Eq,
413    Ord,
414    Hash,
415    Arbitrary
416)]
417pub struct DropClusterReplicaV2 {
418    pub cluster_id: String,
419    pub cluster_name: String,
420    pub replica_id: Option<String>,
421    pub replica_name: String,
422    pub reason: CreateOrDropClusterReplicaReasonV1,
423    #[serde(skip_serializing_if = "Option::is_none")]
424    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
425}
426
427#[derive(
428    Clone,
429    Debug,
430    Serialize,
431    Deserialize,
432    PartialOrd,
433    PartialEq,
434    Eq,
435    Ord,
436    Hash,
437    Arbitrary
438)]
439pub struct DropClusterReplicaV3 {
440    pub cluster_id: String,
441    pub cluster_name: String,
442    pub replica_id: Option<String>,
443    pub replica_name: String,
444    pub reason: CreateOrDropClusterReplicaReasonV1,
445    #[serde(skip_serializing_if = "Option::is_none")]
446    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
447}
448
449#[derive(
450    Clone,
451    Debug,
452    Serialize,
453    Deserialize,
454    PartialOrd,
455    PartialEq,
456    Eq,
457    Ord,
458    Hash,
459    Arbitrary
460)]
461pub struct CreateClusterReplicaV1 {
462    pub cluster_id: String,
463    pub cluster_name: String,
464    // Events that predate v0.32.0 will not have this field set.
465    #[serde(skip_serializing_if = "Option::is_none")]
466    pub replica_id: Option<String>,
467    pub replica_name: String,
468    pub logical_size: String,
469    pub disk: bool,
470    pub billed_as: Option<String>,
471    pub internal: bool,
472}
473
474#[derive(
475    Clone,
476    Debug,
477    Serialize,
478    Deserialize,
479    PartialOrd,
480    PartialEq,
481    Eq,
482    Ord,
483    Hash,
484    Arbitrary
485)]
486pub struct CreateClusterReplicaV2 {
487    pub cluster_id: String,
488    pub cluster_name: String,
489    pub replica_id: Option<String>,
490    pub replica_name: String,
491    pub logical_size: String,
492    pub disk: bool,
493    pub billed_as: Option<String>,
494    pub internal: bool,
495    pub reason: CreateOrDropClusterReplicaReasonV1,
496    #[serde(skip_serializing_if = "Option::is_none")]
497    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
498}
499
500#[derive(
501    Clone,
502    Debug,
503    Serialize,
504    Deserialize,
505    PartialOrd,
506    PartialEq,
507    Eq,
508    Ord,
509    Hash,
510    Arbitrary
511)]
512pub struct CreateClusterReplicaV3 {
513    pub cluster_id: String,
514    pub cluster_name: String,
515    pub replica_id: Option<String>,
516    pub replica_name: String,
517    pub logical_size: String,
518    pub disk: bool,
519    pub billed_as: Option<String>,
520    pub internal: bool,
521    pub reason: CreateOrDropClusterReplicaReasonV1,
522    #[serde(skip_serializing_if = "Option::is_none")]
523    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
524}
525
526#[derive(
527    Clone,
528    Debug,
529    Serialize,
530    Deserialize,
531    PartialOrd,
532    PartialEq,
533    Eq,
534    Ord,
535    Hash,
536    Arbitrary
537)]
538pub struct CreateClusterReplicaV4 {
539    pub cluster_id: String,
540    pub cluster_name: String,
541    pub replica_id: Option<String>,
542    pub replica_name: String,
543    pub logical_size: String,
544    pub billed_as: Option<String>,
545    pub internal: bool,
546    pub reason: CreateOrDropClusterReplicaReasonV1,
547    #[serde(skip_serializing_if = "Option::is_none")]
548    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
549}
550
551#[derive(
552    Clone,
553    Debug,
554    Serialize,
555    Deserialize,
556    PartialOrd,
557    PartialEq,
558    Eq,
559    Ord,
560    Hash,
561    Arbitrary
562)]
563#[serde(rename_all = "kebab-case")]
564pub enum CreateOrDropClusterReplicaReasonV1 {
565    Manual,
566    Schedule,
567    System,
568}
569
570/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
571/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
572/// can be settings of the policy as well as other information about the state of the system.)
573#[derive(
574    Clone,
575    Debug,
576    Serialize,
577    Deserialize,
578    PartialOrd,
579    PartialEq,
580    Eq,
581    Ord,
582    Hash,
583    Arbitrary
584)]
585pub struct SchedulingDecisionsWithReasonsV1 {
586    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
587    pub on_refresh: RefreshDecisionWithReasonV1,
588}
589
590/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
591/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
592/// can be settings of the policy as well as other information about the state of the system.)
593#[derive(
594    Clone,
595    Debug,
596    Serialize,
597    Deserialize,
598    PartialOrd,
599    PartialEq,
600    Eq,
601    Ord,
602    Hash,
603    Arbitrary
604)]
605pub struct SchedulingDecisionsWithReasonsV2 {
606    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
607    pub on_refresh: RefreshDecisionWithReasonV2,
608}
609
610#[derive(
611    Clone,
612    Debug,
613    Serialize,
614    Deserialize,
615    PartialOrd,
616    PartialEq,
617    Eq,
618    Ord,
619    Hash,
620    Arbitrary
621)]
622pub struct RefreshDecisionWithReasonV1 {
623    pub decision: SchedulingDecisionV1,
624    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
625    /// time estimate).
626    pub objects_needing_refresh: Vec<String>,
627    /// The HYDRATION TIME ESTIMATE setting of the cluster.
628    pub hydration_time_estimate: String,
629}
630
631#[derive(
632    Clone,
633    Debug,
634    Serialize,
635    Deserialize,
636    PartialOrd,
637    PartialEq,
638    Eq,
639    Ord,
640    Hash,
641    Arbitrary
642)]
643pub struct RefreshDecisionWithReasonV2 {
644    pub decision: SchedulingDecisionV1,
645    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
646    /// time estimate), and therefore should keep the cluster On.
647    pub objects_needing_refresh: Vec<String>,
648    /// Objects for which we estimate that they currently need Persist compaction, and therefore
649    /// should keep the cluster On.
650    pub objects_needing_compaction: Vec<String>,
651    /// The HYDRATION TIME ESTIMATE setting of the cluster.
652    pub hydration_time_estimate: String,
653}
654
655#[derive(
656    Clone,
657    Debug,
658    Serialize,
659    Deserialize,
660    PartialOrd,
661    PartialEq,
662    Eq,
663    Ord,
664    Hash,
665    Arbitrary
666)]
667#[serde(rename_all = "kebab-case")]
668pub enum SchedulingDecisionV1 {
669    On,
670    Off,
671}
672
673impl From<bool> for SchedulingDecisionV1 {
674    fn from(value: bool) -> Self {
675        match value {
676            true => SchedulingDecisionV1::On,
677            false => SchedulingDecisionV1::Off,
678        }
679    }
680}
681
682#[derive(
683    Clone,
684    Debug,
685    Serialize,
686    Deserialize,
687    PartialOrd,
688    PartialEq,
689    Eq,
690    Ord,
691    Hash,
692    Arbitrary
693)]
694pub struct CreateSourceSinkV1 {
695    pub id: String,
696    #[serde(flatten)]
697    pub name: FullNameV1,
698    pub size: Option<String>,
699}
700
701#[derive(
702    Clone,
703    Debug,
704    Serialize,
705    Deserialize,
706    PartialOrd,
707    PartialEq,
708    Eq,
709    Ord,
710    Hash,
711    Arbitrary
712)]
713pub struct CreateSourceSinkV2 {
714    pub id: String,
715    #[serde(flatten)]
716    pub name: FullNameV1,
717    pub size: Option<String>,
718    #[serde(rename = "type")]
719    pub external_type: String,
720}
721
722#[derive(
723    Clone,
724    Debug,
725    Serialize,
726    Deserialize,
727    PartialOrd,
728    PartialEq,
729    Eq,
730    Ord,
731    Hash,
732    Arbitrary
733)]
734pub struct CreateSourceSinkV3 {
735    pub id: String,
736    #[serde(flatten)]
737    pub name: FullNameV1,
738    #[serde(rename = "type")]
739    pub external_type: String,
740}
741
742#[derive(
743    Clone,
744    Debug,
745    Serialize,
746    Deserialize,
747    PartialOrd,
748    PartialEq,
749    Eq,
750    Ord,
751    Hash,
752    Arbitrary
753)]
754pub struct CreateSourceSinkV4 {
755    pub id: String,
756    pub cluster_id: Option<String>,
757    #[serde(flatten)]
758    pub name: FullNameV1,
759    #[serde(rename = "type")]
760    pub external_type: String,
761}
762
763#[derive(
764    Clone,
765    Debug,
766    Serialize,
767    Deserialize,
768    PartialOrd,
769    PartialEq,
770    Eq,
771    Ord,
772    Hash,
773    Arbitrary
774)]
775pub struct CreateIndexV1 {
776    pub id: String,
777    pub cluster_id: String,
778    #[serde(flatten)]
779    pub name: FullNameV1,
780}
781
782#[derive(
783    Clone,
784    Debug,
785    Serialize,
786    Deserialize,
787    PartialOrd,
788    PartialEq,
789    Eq,
790    Ord,
791    Hash,
792    Arbitrary
793)]
794pub struct CreateMaterializedViewV1 {
795    pub id: String,
796    pub cluster_id: String,
797    #[serde(flatten)]
798    pub name: FullNameV1,
799    #[serde(skip_serializing_if = "Option::is_none")]
800    pub replacement_target_id: Option<String>,
801}
802
803#[derive(
804    Clone,
805    Debug,
806    Serialize,
807    Deserialize,
808    PartialOrd,
809    PartialEq,
810    Eq,
811    Ord,
812    Hash,
813    Arbitrary
814)]
815pub struct AlterApplyReplacementV1 {
816    #[serde(flatten)]
817    pub target: IdFullNameV1,
818    pub replacement: IdFullNameV1,
819}
820
821#[derive(
822    Clone,
823    Debug,
824    Serialize,
825    Deserialize,
826    PartialOrd,
827    PartialEq,
828    Eq,
829    Ord,
830    Hash,
831    Arbitrary
832)]
833pub struct AlterSourceSinkV1 {
834    pub id: String,
835    #[serde(flatten)]
836    pub name: FullNameV1,
837    pub old_size: Option<String>,
838    pub new_size: Option<String>,
839}
840
841#[derive(
842    Clone,
843    Debug,
844    Serialize,
845    Deserialize,
846    PartialOrd,
847    PartialEq,
848    Eq,
849    Ord,
850    Hash,
851    Arbitrary
852)]
853pub struct AlterSetClusterV1 {
854    pub id: String,
855    #[serde(flatten)]
856    pub name: FullNameV1,
857    pub old_cluster_id: String,
858    pub new_cluster_id: String,
859}
860
861#[derive(
862    Clone,
863    Debug,
864    Serialize,
865    Deserialize,
866    PartialOrd,
867    PartialEq,
868    Eq,
869    Ord,
870    Hash,
871    Arbitrary
872)]
873pub struct GrantRoleV1 {
874    pub role_id: String,
875    pub member_id: String,
876    pub grantor_id: String,
877}
878
879#[derive(
880    Clone,
881    Debug,
882    Serialize,
883    Deserialize,
884    PartialOrd,
885    PartialEq,
886    Eq,
887    Ord,
888    Hash,
889    Arbitrary
890)]
891pub struct GrantRoleV2 {
892    pub role_id: String,
893    pub member_id: String,
894    pub grantor_id: String,
895    pub executed_by: String,
896}
897
898#[derive(
899    Clone,
900    Debug,
901    Serialize,
902    Deserialize,
903    PartialOrd,
904    PartialEq,
905    Eq,
906    Ord,
907    Hash,
908    Arbitrary
909)]
910pub struct RevokeRoleV1 {
911    pub role_id: String,
912    pub member_id: String,
913}
914
915#[derive(
916    Clone,
917    Debug,
918    Serialize,
919    Deserialize,
920    PartialOrd,
921    PartialEq,
922    Eq,
923    Ord,
924    Hash,
925    Arbitrary
926)]
927pub struct RevokeRoleV2 {
928    pub role_id: String,
929    pub member_id: String,
930    pub grantor_id: String,
931    pub executed_by: String,
932}
933
934#[derive(
935    Clone,
936    Debug,
937    Serialize,
938    Deserialize,
939    PartialOrd,
940    PartialEq,
941    Eq,
942    Ord,
943    Hash,
944    Arbitrary
945)]
946pub struct UpdatePrivilegeV1 {
947    pub object_id: String,
948    pub grantee_id: String,
949    pub grantor_id: String,
950    pub privileges: String,
951}
952
953#[derive(
954    Clone,
955    Debug,
956    Serialize,
957    Deserialize,
958    PartialOrd,
959    PartialEq,
960    Eq,
961    Ord,
962    Hash,
963    Arbitrary
964)]
965pub struct AlterDefaultPrivilegeV1 {
966    pub role_id: String,
967    pub database_id: Option<String>,
968    pub schema_id: Option<String>,
969    pub grantee_id: String,
970    pub privileges: String,
971}
972
973#[derive(
974    Clone,
975    Debug,
976    Serialize,
977    Deserialize,
978    PartialOrd,
979    PartialEq,
980    Eq,
981    Ord,
982    Hash,
983    Arbitrary
984)]
985pub struct UpdateOwnerV1 {
986    pub object_id: String,
987    pub old_owner_id: String,
988    pub new_owner_id: String,
989}
990
991#[derive(
992    Clone,
993    Debug,
994    Serialize,
995    Deserialize,
996    PartialOrd,
997    PartialEq,
998    Eq,
999    Ord,
1000    Hash,
1001    Arbitrary
1002)]
1003pub struct SchemaV1 {
1004    pub id: String,
1005    pub name: String,
1006    pub database_name: String,
1007}
1008
1009#[derive(
1010    Clone,
1011    Debug,
1012    Serialize,
1013    Deserialize,
1014    PartialOrd,
1015    PartialEq,
1016    Eq,
1017    Ord,
1018    Hash,
1019    Arbitrary
1020)]
1021pub struct SchemaV2 {
1022    pub id: String,
1023    pub name: String,
1024    pub database_name: Option<String>,
1025}
1026
1027#[derive(
1028    Clone,
1029    Debug,
1030    Serialize,
1031    Deserialize,
1032    PartialOrd,
1033    PartialEq,
1034    Eq,
1035    Ord,
1036    Hash,
1037    Arbitrary
1038)]
1039pub struct RenameSchemaV1 {
1040    pub id: String,
1041    pub database_name: Option<String>,
1042    pub old_name: String,
1043    pub new_name: String,
1044}
1045
1046#[derive(
1047    Clone,
1048    Debug,
1049    Serialize,
1050    Deserialize,
1051    PartialOrd,
1052    PartialEq,
1053    Eq,
1054    Ord,
1055    Hash,
1056    Arbitrary
1057)]
1058pub struct AlterRetainHistoryV1 {
1059    pub id: String,
1060    pub old_history: Option<String>,
1061    pub new_history: Option<String>,
1062}
1063
1064#[derive(
1065    Clone,
1066    Debug,
1067    Serialize,
1068    Deserialize,
1069    PartialOrd,
1070    PartialEq,
1071    Eq,
1072    Ord,
1073    Hash,
1074    Arbitrary
1075)]
1076pub struct UpdateItemV1 {
1077    pub id: String,
1078    #[serde(flatten)]
1079    pub name: FullNameV1,
1080}
1081
1082#[derive(
1083    Clone,
1084    Debug,
1085    Serialize,
1086    Deserialize,
1087    PartialOrd,
1088    PartialEq,
1089    Eq,
1090    Ord,
1091    Hash,
1092    Arbitrary
1093)]
1094pub struct ToNewIdV1 {
1095    pub id: String,
1096    pub new_id: String,
1097}
1098
1099#[derive(
1100    Clone,
1101    Debug,
1102    Serialize,
1103    Deserialize,
1104    PartialOrd,
1105    PartialEq,
1106    Eq,
1107    Ord,
1108    Hash,
1109    Arbitrary
1110)]
1111pub struct FromPreviousIdV1 {
1112    pub id: String,
1113    pub previous_id: String,
1114}
1115
1116impl EventDetails {
1117    pub fn as_json(&self) -> serde_json::Value {
1118        match self {
1119            EventDetails::CreateClusterReplicaV1(v) => {
1120                serde_json::to_value(v).expect("must serialize")
1121            }
1122            EventDetails::CreateClusterReplicaV2(v) => {
1123                serde_json::to_value(v).expect("must serialize")
1124            }
1125            EventDetails::CreateClusterReplicaV3(v) => {
1126                serde_json::to_value(v).expect("must serialize")
1127            }
1128            EventDetails::CreateClusterReplicaV4(v) => {
1129                serde_json::to_value(v).expect("must serialize")
1130            }
1131            EventDetails::DropClusterReplicaV1(v) => {
1132                serde_json::to_value(v).expect("must serialize")
1133            }
1134            EventDetails::DropClusterReplicaV2(v) => {
1135                serde_json::to_value(v).expect("must serialize")
1136            }
1137            EventDetails::DropClusterReplicaV3(v) => {
1138                serde_json::to_value(v).expect("must serialize")
1139            }
1140            EventDetails::IdFullNameV1(v) => serde_json::to_value(v).expect("must serialize"),
1141            EventDetails::RenameClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
1142            EventDetails::RenameClusterReplicaV1(v) => {
1143                serde_json::to_value(v).expect("must serialize")
1144            }
1145            EventDetails::RenameItemV1(v) => serde_json::to_value(v).expect("must serialize"),
1146            EventDetails::IdNameV1(v) => serde_json::to_value(v).expect("must serialize"),
1147            EventDetails::SchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
1148            EventDetails::SchemaV2(v) => serde_json::to_value(v).expect("must serialize"),
1149            EventDetails::RenameSchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
1150            EventDetails::CreateSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
1151            EventDetails::CreateSourceSinkV2(v) => serde_json::to_value(v).expect("must serialize"),
1152            EventDetails::CreateSourceSinkV3(v) => serde_json::to_value(v).expect("must serialize"),
1153            EventDetails::CreateSourceSinkV4(v) => serde_json::to_value(v).expect("must serialize"),
1154            EventDetails::CreateIndexV1(v) => serde_json::to_value(v).expect("must serialize"),
1155            EventDetails::CreateMaterializedViewV1(v) => {
1156                serde_json::to_value(v).expect("must serialize")
1157            }
1158            EventDetails::AlterApplyReplacementV1(v) => {
1159                serde_json::to_value(v).expect("must serialize")
1160            }
1161            EventDetails::AlterSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
1162            EventDetails::AlterSetClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
1163            EventDetails::GrantRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1164            EventDetails::GrantRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
1165            EventDetails::RevokeRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1166            EventDetails::RevokeRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
1167            EventDetails::UpdatePrivilegeV1(v) => serde_json::to_value(v).expect("must serialize"),
1168            EventDetails::AlterDefaultPrivilegeV1(v) => {
1169                serde_json::to_value(v).expect("must serialize")
1170            }
1171            EventDetails::UpdateOwnerV1(v) => serde_json::to_value(v).expect("must serialize"),
1172            EventDetails::UpdateItemV1(v) => serde_json::to_value(v).expect("must serialize"),
1173            EventDetails::AlterRetainHistoryV1(v) => {
1174                serde_json::to_value(v).expect("must serialize")
1175            }
1176            EventDetails::ToNewIdV1(v) => serde_json::to_value(v).expect("must serialize"),
1177            EventDetails::FromPreviousIdV1(v) => serde_json::to_value(v).expect("must serialize"),
1178            EventDetails::SetV1(v) => serde_json::to_value(v).expect("must serialize"),
1179            EventDetails::ResetAllV1 => serde_json::Value::Null,
1180            EventDetails::RotateKeysV1(v) => serde_json::to_value(v).expect("must serialize"),
1181        }
1182    }
1183}
1184
1185#[derive(
1186    Clone,
1187    Debug,
1188    Serialize,
1189    Deserialize,
1190    PartialOrd,
1191    PartialEq,
1192    Eq,
1193    Ord,
1194    Hash,
1195    Arbitrary
1196)]
1197pub struct EventV1 {
1198    pub id: u64,
1199    pub event_type: EventType,
1200    pub object_type: ObjectType,
1201    pub details: EventDetails,
1202    pub user: Option<String>,
1203    pub occurred_at: EpochMillis,
1204}
1205
1206impl EventV1 {
1207    fn new(
1208        id: u64,
1209        event_type: EventType,
1210        object_type: ObjectType,
1211        details: EventDetails,
1212        user: Option<String>,
1213        occurred_at: EpochMillis,
1214    ) -> EventV1 {
1215        EventV1 {
1216            id,
1217            event_type,
1218            object_type,
1219            details,
1220            user,
1221            occurred_at,
1222        }
1223    }
1224}
1225
1226#[derive(
1227    Clone,
1228    Debug,
1229    Serialize,
1230    Deserialize,
1231    PartialOrd,
1232    PartialEq,
1233    Eq,
1234    Ord,
1235    Hash,
1236    Arbitrary
1237)]
1238pub struct StorageUsageV1 {
1239    pub id: u64,
1240    pub shard_id: Option<String>,
1241    pub size_bytes: u64,
1242    pub collection_timestamp: EpochMillis,
1243}
1244
1245impl StorageUsageV1 {
1246    pub fn new(
1247        id: u64,
1248        shard_id: Option<String>,
1249        size_bytes: u64,
1250        collection_timestamp: EpochMillis,
1251    ) -> StorageUsageV1 {
1252        StorageUsageV1 {
1253            id,
1254            shard_id,
1255            size_bytes,
1256            collection_timestamp,
1257        }
1258    }
1259}
1260
1261/// Describes the environment's storage usage at a point in time.
1262///
1263/// This type is persisted in the catalog across restarts, so any updates to the
1264/// schema will require a new version.
1265#[derive(
1266    Clone,
1267    Debug,
1268    Serialize,
1269    Deserialize,
1270    PartialOrd,
1271    PartialEq,
1272    Eq,
1273    Ord,
1274    Hash,
1275    Arbitrary
1276)]
1277pub enum VersionedStorageUsage {
1278    V1(StorageUsageV1),
1279}
1280
1281impl VersionedStorageUsage {
1282    /// Create a new metric snapshot.
1283    /// This function must always require and produce the most
1284    /// recent variant of VersionedStorageMetrics.
1285    pub fn new(
1286        id: u64,
1287        object_id: Option<String>,
1288        size_bytes: u64,
1289        collection_timestamp: EpochMillis,
1290    ) -> Self {
1291        Self::V1(StorageUsageV1::new(
1292            id,
1293            object_id,
1294            size_bytes,
1295            collection_timestamp,
1296        ))
1297    }
1298
1299    // Implement deserialize and serialize so writers and readers don't have to
1300    // coordinate about which Serializer to use.
1301    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
1302        Ok(serde_json::from_slice(data)?)
1303    }
1304
1305    pub fn serialize(&self) -> Vec<u8> {
1306        serde_json::to_vec(self).expect("must serialize")
1307    }
1308
1309    pub fn timestamp(&self) -> EpochMillis {
1310        match self {
1311            VersionedStorageUsage::V1(StorageUsageV1 {
1312                collection_timestamp,
1313                ..
1314            }) => *collection_timestamp,
1315        }
1316    }
1317
1318    /// Returns a globally sortable event order. All event versions must have this
1319    /// field.
1320    pub fn sortable_id(&self) -> u64 {
1321        match self {
1322            VersionedStorageUsage::V1(usage) => usage.id,
1323        }
1324    }
1325}
1326
1327#[cfg(test)]
1328mod tests {
1329    use crate::{EventDetails, EventType, EventV1, IdNameV1, ObjectType, VersionedEvent};
1330
1331    // Test all versions of events. This test hard codes bytes so that
1332    // programmers are not able to change data structures here without this test
1333    // failing. Instead of changing data structures, add new variants.
1334    #[mz_ore::test]
1335    fn test_audit_log() -> Result<(), anyhow::Error> {
1336        let cases: Vec<(VersionedEvent, &'static str)> = vec![(
1337            VersionedEvent::V1(EventV1::new(
1338                2,
1339                EventType::Drop,
1340                ObjectType::ClusterReplica,
1341                EventDetails::IdNameV1(IdNameV1 {
1342                    id: "u1".to_string(),
1343                    name: "name".into(),
1344                }),
1345                None,
1346                2,
1347            )),
1348            r#"{"V1":{"id":2,"event_type":"drop","object_type":"cluster-replica","details":{"IdNameV1":{"id":"u1","name":"name"}},"user":null,"occurred_at":2}}"#,
1349        )];
1350
1351        for (event, expected_bytes) in cases {
1352            let event_bytes = serde_json::to_vec(&event).unwrap();
1353            assert_eq!(
1354                event_bytes,
1355                expected_bytes.as_bytes(),
1356                "expected bytes {}, got {}",
1357                expected_bytes,
1358                std::str::from_utf8(&event_bytes).unwrap(),
1359            );
1360        }
1361
1362        Ok(())
1363    }
1364}