Skip to main content

mz_audit_log/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Audit log data structures.
11//!
12//! The audit log is logging that is produced by user actions and consumed
13//! by users in the form of the `mz_catalog.mz_audit_events` SQL table and
14//! by the cloud management layer for billing and introspection. This crate
15//! is designed to make the production and consumption of the logs type
16//! safe. Events and their metadata are versioned and the data structures
17//! replicated here so that if the data change in some other crate, a
18//! new version here can be made. This avoids needing to poke at the data
19//! when reading it to determine what it means and should have full backward
20//! compatibility. This is its own crate so that production and consumption can
21//! be in different processes and production is not allowed to specify private
22//! data structures unknown to the reader.
23
24use mz_ore::now::EpochMillis;
25use proptest_derive::Arbitrary;
26use serde::{Deserialize, Serialize};
27
28/// New version variants should be added if fields need to be added, changed, or removed.
29#[derive(
30    Clone,
31    Debug,
32    Serialize,
33    Deserialize,
34    PartialOrd,
35    PartialEq,
36    Eq,
37    Ord,
38    Hash,
39    Arbitrary
40)]
41pub enum VersionedEvent {
42    V1(EventV1),
43}
44
45impl VersionedEvent {
46    /// Create a new event. This function must always require and produce the most
47    /// recent variant of VersionedEvent. `id` must be a globally increasing,
48    /// ordered number such that sorting by it on all events yields the order
49    /// of events by users. It is insufficient to use `occurred_at` (even at
50    /// nanosecond precision) due to clock unpredictability.
51    pub fn new(
52        id: u64,
53        event_type: EventType,
54        object_type: ObjectType,
55        details: EventDetails,
56        user: Option<String>,
57        occurred_at: EpochMillis,
58    ) -> Self {
59        Self::V1(EventV1::new(
60            id,
61            event_type,
62            object_type,
63            details,
64            user,
65            occurred_at,
66        ))
67    }
68
69    // Implement deserialize and serialize so writers and readers don't have to
70    // coordinate about which Serializer to use.
71    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
72        Ok(serde_json::from_slice(data)?)
73    }
74
75    pub fn serialize(&self) -> Vec<u8> {
76        serde_json::to_vec(self).expect("must serialize")
77    }
78
79    /// Returns a globally sortable event order. All event versions must have this
80    /// field.
81    pub fn sortable_id(&self) -> u64 {
82        match self {
83            VersionedEvent::V1(ev) => ev.id,
84        }
85    }
86}
87
88#[derive(
89    Clone,
90    Debug,
91    Serialize,
92    Deserialize,
93    PartialOrd,
94    PartialEq,
95    Eq,
96    Ord,
97    Hash,
98    Arbitrary
99)]
100#[serde(rename_all = "kebab-case")]
101pub enum EventType {
102    Create,
103    Drop,
104    Alter,
105    Grant,
106    Revoke,
107    Comment,
108}
109
110impl EventType {
111    pub fn as_title_case(&self) -> &'static str {
112        match self {
113            EventType::Create => "Created",
114            EventType::Drop => "Dropped",
115            EventType::Alter => "Altered",
116            EventType::Grant => "Granted",
117            EventType::Revoke => "Revoked",
118            EventType::Comment => "Comment",
119        }
120    }
121}
122
123serde_plain::derive_display_from_serialize!(EventType);
124
125#[derive(
126    Clone,
127    Copy,
128    Debug,
129    Serialize,
130    Deserialize,
131    PartialOrd,
132    PartialEq,
133    Eq,
134    Ord,
135    Hash,
136    Arbitrary
137)]
138#[serde(rename_all = "kebab-case")]
139pub enum ObjectType {
140    Cluster,
141    ClusterReplica,
142    Connection,
143    ContinualTask,
144    Database,
145    Func,
146    Index,
147    MaterializedView,
148    NetworkPolicy,
149    Role,
150    Secret,
151    Schema,
152    Sink,
153    Source,
154    System,
155    Table,
156    Type,
157    View,
158}
159
160impl ObjectType {
161    pub fn as_title_case(&self) -> &'static str {
162        match self {
163            ObjectType::Cluster => "Cluster",
164            ObjectType::ClusterReplica => "Cluster Replica",
165            ObjectType::Connection => "Connection",
166            ObjectType::ContinualTask => "Continual Task",
167            ObjectType::Database => "Database",
168            ObjectType::Func => "Function",
169            ObjectType::Index => "Index",
170            ObjectType::MaterializedView => "Materialized View",
171            ObjectType::NetworkPolicy => "Network Policy",
172            ObjectType::Role => "Role",
173            ObjectType::Schema => "Schema",
174            ObjectType::Secret => "Secret",
175            ObjectType::Sink => "Sink",
176            ObjectType::Source => "Source",
177            ObjectType::System => "System",
178            ObjectType::Table => "Table",
179            ObjectType::Type => "Type",
180            ObjectType::View => "View",
181        }
182    }
183}
184
185serde_plain::derive_display_from_serialize!(ObjectType);
186
187#[derive(
188    Clone,
189    Debug,
190    Serialize,
191    Deserialize,
192    PartialOrd,
193    PartialEq,
194    Eq,
195    Ord,
196    Hash,
197    Arbitrary
198)]
199pub enum EventDetails {
200    #[serde(rename = "CreateComputeReplicaV1")] // historical name
201    CreateClusterReplicaV1(CreateClusterReplicaV1),
202    CreateClusterReplicaV2(CreateClusterReplicaV2),
203    CreateClusterReplicaV3(CreateClusterReplicaV3),
204    CreateClusterReplicaV4(CreateClusterReplicaV4),
205    #[serde(rename = "DropComputeReplicaV1")] // historical name
206    DropClusterReplicaV1(DropClusterReplicaV1),
207    DropClusterReplicaV2(DropClusterReplicaV2),
208    DropClusterReplicaV3(DropClusterReplicaV3),
209    CreateSourceSinkV1(CreateSourceSinkV1),
210    CreateSourceSinkV2(CreateSourceSinkV2),
211    CreateSourceSinkV3(CreateSourceSinkV3),
212    CreateSourceSinkV4(CreateSourceSinkV4),
213    CreateIndexV1(CreateIndexV1),
214    CreateMaterializedViewV1(CreateMaterializedViewV1),
215    AlterApplyReplacementV1(AlterApplyReplacementV1),
216    AlterSetClusterV1(AlterSetClusterV1),
217    AlterSourceSinkV1(AlterSourceSinkV1),
218    GrantRoleV1(GrantRoleV1),
219    GrantRoleV2(GrantRoleV2),
220    RevokeRoleV1(RevokeRoleV1),
221    RevokeRoleV2(RevokeRoleV2),
222    UpdatePrivilegeV1(UpdatePrivilegeV1),
223    AlterDefaultPrivilegeV1(AlterDefaultPrivilegeV1),
224    UpdateOwnerV1(UpdateOwnerV1),
225    IdFullNameV1(IdFullNameV1),
226    RenameClusterV1(RenameClusterV1),
227    RenameClusterReplicaV1(RenameClusterReplicaV1),
228    RenameItemV1(RenameItemV1),
229    IdNameV1(IdNameV1),
230    SchemaV1(SchemaV1),
231    SchemaV2(SchemaV2),
232    UpdateItemV1(UpdateItemV1),
233    RenameSchemaV1(RenameSchemaV1),
234    AlterRetainHistoryV1(AlterRetainHistoryV1),
235    AlterAddColumnV1(AlterAddColumnV1),
236    AlterSourceTimestampIntervalV1(AlterSourceTimestampIntervalV1),
237    ToNewIdV1(ToNewIdV1),
238    FromPreviousIdV1(FromPreviousIdV1),
239    SetV1(SetV1),
240    ResetAllV1,
241    RotateKeysV1(RotateKeysV1),
242    CreateRoleV1(CreateRoleV1),
243}
244
245#[derive(
246    Clone,
247    Debug,
248    Serialize,
249    Deserialize,
250    PartialOrd,
251    PartialEq,
252    Eq,
253    Ord,
254    Hash,
255    Arbitrary
256)]
257pub struct SetV1 {
258    pub name: String,
259    pub value: Option<String>,
260}
261
262#[derive(
263    Clone,
264    Debug,
265    Serialize,
266    Deserialize,
267    PartialOrd,
268    PartialEq,
269    Eq,
270    Ord,
271    Hash,
272    Arbitrary
273)]
274pub struct RotateKeysV1 {
275    pub id: String,
276    pub name: String,
277}
278
279#[derive(
280    Clone,
281    Debug,
282    Serialize,
283    Deserialize,
284    PartialOrd,
285    PartialEq,
286    Eq,
287    Ord,
288    Hash,
289    Arbitrary
290)]
291pub struct IdFullNameV1 {
292    pub id: String,
293    #[serde(flatten)]
294    pub name: FullNameV1,
295}
296
297#[derive(
298    Clone,
299    Debug,
300    Serialize,
301    Deserialize,
302    PartialOrd,
303    PartialEq,
304    Eq,
305    Ord,
306    Hash,
307    Arbitrary
308)]
309pub struct FullNameV1 {
310    pub database: String,
311    pub schema: String,
312    pub item: String,
313}
314
315#[derive(
316    Clone,
317    Debug,
318    Serialize,
319    Deserialize,
320    PartialOrd,
321    PartialEq,
322    Eq,
323    Ord,
324    Hash,
325    Arbitrary
326)]
327pub struct IdNameV1 {
328    pub id: String,
329    pub name: String,
330}
331
332#[derive(
333    Clone,
334    Debug,
335    Serialize,
336    Deserialize,
337    PartialOrd,
338    PartialEq,
339    Eq,
340    Ord,
341    Hash,
342    Arbitrary
343)]
344pub struct CreateRoleV1 {
345    pub id: String,
346    pub name: String,
347    pub auto_provision_source: Option<String>,
348}
349
350#[derive(
351    Clone,
352    Debug,
353    Serialize,
354    Deserialize,
355    PartialOrd,
356    PartialEq,
357    Eq,
358    Ord,
359    Hash,
360    Arbitrary
361)]
362pub struct RenameItemV1 {
363    pub id: String,
364    pub old_name: FullNameV1,
365    pub new_name: FullNameV1,
366}
367
368#[derive(
369    Clone,
370    Debug,
371    Serialize,
372    Deserialize,
373    PartialOrd,
374    PartialEq,
375    Eq,
376    Ord,
377    Hash,
378    Arbitrary
379)]
380pub struct RenameClusterV1 {
381    pub id: String,
382    pub old_name: String,
383    pub new_name: String,
384}
385
386#[derive(
387    Clone,
388    Debug,
389    Serialize,
390    Deserialize,
391    PartialOrd,
392    PartialEq,
393    Eq,
394    Ord,
395    Hash,
396    Arbitrary
397)]
398pub struct RenameClusterReplicaV1 {
399    pub cluster_id: String,
400    pub replica_id: String,
401    pub old_name: String,
402    pub new_name: String,
403}
404
405#[derive(
406    Clone,
407    Debug,
408    Serialize,
409    Deserialize,
410    PartialOrd,
411    PartialEq,
412    Eq,
413    Ord,
414    Hash,
415    Arbitrary
416)]
417pub struct DropClusterReplicaV1 {
418    pub cluster_id: String,
419    pub cluster_name: String,
420    // Events that predate v0.32.0 will not have this field set.
421    #[serde(skip_serializing_if = "Option::is_none")]
422    pub replica_id: Option<String>,
423    pub replica_name: String,
424}
425
426#[derive(
427    Clone,
428    Debug,
429    Serialize,
430    Deserialize,
431    PartialOrd,
432    PartialEq,
433    Eq,
434    Ord,
435    Hash,
436    Arbitrary
437)]
438pub struct DropClusterReplicaV2 {
439    pub cluster_id: String,
440    pub cluster_name: String,
441    pub replica_id: Option<String>,
442    pub replica_name: String,
443    pub reason: CreateOrDropClusterReplicaReasonV1,
444    #[serde(skip_serializing_if = "Option::is_none")]
445    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
446}
447
448#[derive(
449    Clone,
450    Debug,
451    Serialize,
452    Deserialize,
453    PartialOrd,
454    PartialEq,
455    Eq,
456    Ord,
457    Hash,
458    Arbitrary
459)]
460pub struct DropClusterReplicaV3 {
461    pub cluster_id: String,
462    pub cluster_name: String,
463    pub replica_id: Option<String>,
464    pub replica_name: String,
465    pub reason: CreateOrDropClusterReplicaReasonV1,
466    #[serde(skip_serializing_if = "Option::is_none")]
467    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
468}
469
470#[derive(
471    Clone,
472    Debug,
473    Serialize,
474    Deserialize,
475    PartialOrd,
476    PartialEq,
477    Eq,
478    Ord,
479    Hash,
480    Arbitrary
481)]
482pub struct CreateClusterReplicaV1 {
483    pub cluster_id: String,
484    pub cluster_name: String,
485    // Events that predate v0.32.0 will not have this field set.
486    #[serde(skip_serializing_if = "Option::is_none")]
487    pub replica_id: Option<String>,
488    pub replica_name: String,
489    pub logical_size: String,
490    pub disk: bool,
491    pub billed_as: Option<String>,
492    pub internal: bool,
493}
494
495#[derive(
496    Clone,
497    Debug,
498    Serialize,
499    Deserialize,
500    PartialOrd,
501    PartialEq,
502    Eq,
503    Ord,
504    Hash,
505    Arbitrary
506)]
507pub struct CreateClusterReplicaV2 {
508    pub cluster_id: String,
509    pub cluster_name: String,
510    pub replica_id: Option<String>,
511    pub replica_name: String,
512    pub logical_size: String,
513    pub disk: bool,
514    pub billed_as: Option<String>,
515    pub internal: bool,
516    pub reason: CreateOrDropClusterReplicaReasonV1,
517    #[serde(skip_serializing_if = "Option::is_none")]
518    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
519}
520
521#[derive(
522    Clone,
523    Debug,
524    Serialize,
525    Deserialize,
526    PartialOrd,
527    PartialEq,
528    Eq,
529    Ord,
530    Hash,
531    Arbitrary
532)]
533pub struct CreateClusterReplicaV3 {
534    pub cluster_id: String,
535    pub cluster_name: String,
536    pub replica_id: Option<String>,
537    pub replica_name: String,
538    pub logical_size: String,
539    pub disk: bool,
540    pub billed_as: Option<String>,
541    pub internal: bool,
542    pub reason: CreateOrDropClusterReplicaReasonV1,
543    #[serde(skip_serializing_if = "Option::is_none")]
544    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
545}
546
547#[derive(
548    Clone,
549    Debug,
550    Serialize,
551    Deserialize,
552    PartialOrd,
553    PartialEq,
554    Eq,
555    Ord,
556    Hash,
557    Arbitrary
558)]
559pub struct CreateClusterReplicaV4 {
560    pub cluster_id: String,
561    pub cluster_name: String,
562    pub replica_id: Option<String>,
563    pub replica_name: String,
564    pub logical_size: String,
565    pub billed_as: Option<String>,
566    pub internal: bool,
567    pub reason: CreateOrDropClusterReplicaReasonV1,
568    #[serde(skip_serializing_if = "Option::is_none")]
569    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
570}
571
572#[derive(
573    Clone,
574    Debug,
575    Serialize,
576    Deserialize,
577    PartialOrd,
578    PartialEq,
579    Eq,
580    Ord,
581    Hash,
582    Arbitrary
583)]
584#[serde(rename_all = "kebab-case")]
585pub enum CreateOrDropClusterReplicaReasonV1 {
586    Manual,
587    Schedule,
588    System,
589}
590
591/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
592/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
593/// can be settings of the policy as well as other information about the state of the system.)
594#[derive(
595    Clone,
596    Debug,
597    Serialize,
598    Deserialize,
599    PartialOrd,
600    PartialEq,
601    Eq,
602    Ord,
603    Hash,
604    Arbitrary
605)]
606pub struct SchedulingDecisionsWithReasonsV1 {
607    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
608    pub on_refresh: RefreshDecisionWithReasonV1,
609}
610
611/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
612/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
613/// can be settings of the policy as well as other information about the state of the system.)
614#[derive(
615    Clone,
616    Debug,
617    Serialize,
618    Deserialize,
619    PartialOrd,
620    PartialEq,
621    Eq,
622    Ord,
623    Hash,
624    Arbitrary
625)]
626pub struct SchedulingDecisionsWithReasonsV2 {
627    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
628    pub on_refresh: RefreshDecisionWithReasonV2,
629}
630
631#[derive(
632    Clone,
633    Debug,
634    Serialize,
635    Deserialize,
636    PartialOrd,
637    PartialEq,
638    Eq,
639    Ord,
640    Hash,
641    Arbitrary
642)]
643pub struct RefreshDecisionWithReasonV1 {
644    pub decision: SchedulingDecisionV1,
645    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
646    /// time estimate).
647    pub objects_needing_refresh: Vec<String>,
648    /// The HYDRATION TIME ESTIMATE setting of the cluster.
649    pub hydration_time_estimate: String,
650}
651
652#[derive(
653    Clone,
654    Debug,
655    Serialize,
656    Deserialize,
657    PartialOrd,
658    PartialEq,
659    Eq,
660    Ord,
661    Hash,
662    Arbitrary
663)]
664pub struct RefreshDecisionWithReasonV2 {
665    pub decision: SchedulingDecisionV1,
666    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
667    /// time estimate), and therefore should keep the cluster On.
668    pub objects_needing_refresh: Vec<String>,
669    /// Objects for which we estimate that they currently need Persist compaction, and therefore
670    /// should keep the cluster On.
671    pub objects_needing_compaction: Vec<String>,
672    /// The HYDRATION TIME ESTIMATE setting of the cluster.
673    pub hydration_time_estimate: String,
674}
675
676#[derive(
677    Clone,
678    Debug,
679    Serialize,
680    Deserialize,
681    PartialOrd,
682    PartialEq,
683    Eq,
684    Ord,
685    Hash,
686    Arbitrary
687)]
688#[serde(rename_all = "kebab-case")]
689pub enum SchedulingDecisionV1 {
690    On,
691    Off,
692}
693
694impl From<bool> for SchedulingDecisionV1 {
695    fn from(value: bool) -> Self {
696        match value {
697            true => SchedulingDecisionV1::On,
698            false => SchedulingDecisionV1::Off,
699        }
700    }
701}
702
703#[derive(
704    Clone,
705    Debug,
706    Serialize,
707    Deserialize,
708    PartialOrd,
709    PartialEq,
710    Eq,
711    Ord,
712    Hash,
713    Arbitrary
714)]
715pub struct CreateSourceSinkV1 {
716    pub id: String,
717    #[serde(flatten)]
718    pub name: FullNameV1,
719    pub size: Option<String>,
720}
721
722#[derive(
723    Clone,
724    Debug,
725    Serialize,
726    Deserialize,
727    PartialOrd,
728    PartialEq,
729    Eq,
730    Ord,
731    Hash,
732    Arbitrary
733)]
734pub struct CreateSourceSinkV2 {
735    pub id: String,
736    #[serde(flatten)]
737    pub name: FullNameV1,
738    pub size: Option<String>,
739    #[serde(rename = "type")]
740    pub external_type: String,
741}
742
743#[derive(
744    Clone,
745    Debug,
746    Serialize,
747    Deserialize,
748    PartialOrd,
749    PartialEq,
750    Eq,
751    Ord,
752    Hash,
753    Arbitrary
754)]
755pub struct CreateSourceSinkV3 {
756    pub id: String,
757    #[serde(flatten)]
758    pub name: FullNameV1,
759    #[serde(rename = "type")]
760    pub external_type: String,
761}
762
763#[derive(
764    Clone,
765    Debug,
766    Serialize,
767    Deserialize,
768    PartialOrd,
769    PartialEq,
770    Eq,
771    Ord,
772    Hash,
773    Arbitrary
774)]
775pub struct CreateSourceSinkV4 {
776    pub id: String,
777    pub cluster_id: Option<String>,
778    #[serde(flatten)]
779    pub name: FullNameV1,
780    #[serde(rename = "type")]
781    pub external_type: String,
782}
783
784#[derive(
785    Clone,
786    Debug,
787    Serialize,
788    Deserialize,
789    PartialOrd,
790    PartialEq,
791    Eq,
792    Ord,
793    Hash,
794    Arbitrary
795)]
796pub struct CreateIndexV1 {
797    pub id: String,
798    pub cluster_id: String,
799    #[serde(flatten)]
800    pub name: FullNameV1,
801}
802
803#[derive(
804    Clone,
805    Debug,
806    Serialize,
807    Deserialize,
808    PartialOrd,
809    PartialEq,
810    Eq,
811    Ord,
812    Hash,
813    Arbitrary
814)]
815pub struct CreateMaterializedViewV1 {
816    pub id: String,
817    pub cluster_id: String,
818    #[serde(flatten)]
819    pub name: FullNameV1,
820    #[serde(skip_serializing_if = "Option::is_none")]
821    pub replacement_target_id: Option<String>,
822}
823
824#[derive(
825    Clone,
826    Debug,
827    Serialize,
828    Deserialize,
829    PartialOrd,
830    PartialEq,
831    Eq,
832    Ord,
833    Hash,
834    Arbitrary
835)]
836pub struct AlterApplyReplacementV1 {
837    #[serde(flatten)]
838    pub target: IdFullNameV1,
839    pub replacement: IdFullNameV1,
840}
841
842#[derive(
843    Clone,
844    Debug,
845    Serialize,
846    Deserialize,
847    PartialOrd,
848    PartialEq,
849    Eq,
850    Ord,
851    Hash,
852    Arbitrary
853)]
854pub struct AlterSourceSinkV1 {
855    pub id: String,
856    #[serde(flatten)]
857    pub name: FullNameV1,
858    pub old_size: Option<String>,
859    pub new_size: Option<String>,
860}
861
862#[derive(
863    Clone,
864    Debug,
865    Serialize,
866    Deserialize,
867    PartialOrd,
868    PartialEq,
869    Eq,
870    Ord,
871    Hash,
872    Arbitrary
873)]
874pub struct AlterSetClusterV1 {
875    pub id: String,
876    #[serde(flatten)]
877    pub name: FullNameV1,
878    pub old_cluster_id: String,
879    pub new_cluster_id: String,
880}
881
882#[derive(
883    Clone,
884    Debug,
885    Serialize,
886    Deserialize,
887    PartialOrd,
888    PartialEq,
889    Eq,
890    Ord,
891    Hash,
892    Arbitrary
893)]
894pub struct GrantRoleV1 {
895    pub role_id: String,
896    pub member_id: String,
897    pub grantor_id: String,
898}
899
900#[derive(
901    Clone,
902    Debug,
903    Serialize,
904    Deserialize,
905    PartialOrd,
906    PartialEq,
907    Eq,
908    Ord,
909    Hash,
910    Arbitrary
911)]
912pub struct GrantRoleV2 {
913    pub role_id: String,
914    pub member_id: String,
915    pub grantor_id: String,
916    pub executed_by: String,
917}
918
919#[derive(
920    Clone,
921    Debug,
922    Serialize,
923    Deserialize,
924    PartialOrd,
925    PartialEq,
926    Eq,
927    Ord,
928    Hash,
929    Arbitrary
930)]
931pub struct RevokeRoleV1 {
932    pub role_id: String,
933    pub member_id: String,
934}
935
936#[derive(
937    Clone,
938    Debug,
939    Serialize,
940    Deserialize,
941    PartialOrd,
942    PartialEq,
943    Eq,
944    Ord,
945    Hash,
946    Arbitrary
947)]
948pub struct RevokeRoleV2 {
949    pub role_id: String,
950    pub member_id: String,
951    pub grantor_id: String,
952    pub executed_by: String,
953}
954
955#[derive(
956    Clone,
957    Debug,
958    Serialize,
959    Deserialize,
960    PartialOrd,
961    PartialEq,
962    Eq,
963    Ord,
964    Hash,
965    Arbitrary
966)]
967pub struct UpdatePrivilegeV1 {
968    pub object_id: String,
969    pub grantee_id: String,
970    pub grantor_id: String,
971    pub privileges: String,
972}
973
974#[derive(
975    Clone,
976    Debug,
977    Serialize,
978    Deserialize,
979    PartialOrd,
980    PartialEq,
981    Eq,
982    Ord,
983    Hash,
984    Arbitrary
985)]
986pub struct AlterDefaultPrivilegeV1 {
987    pub role_id: String,
988    pub database_id: Option<String>,
989    pub schema_id: Option<String>,
990    pub grantee_id: String,
991    pub privileges: String,
992}
993
994#[derive(
995    Clone,
996    Debug,
997    Serialize,
998    Deserialize,
999    PartialOrd,
1000    PartialEq,
1001    Eq,
1002    Ord,
1003    Hash,
1004    Arbitrary
1005)]
1006pub struct UpdateOwnerV1 {
1007    pub object_id: String,
1008    pub old_owner_id: String,
1009    pub new_owner_id: String,
1010}
1011
1012#[derive(
1013    Clone,
1014    Debug,
1015    Serialize,
1016    Deserialize,
1017    PartialOrd,
1018    PartialEq,
1019    Eq,
1020    Ord,
1021    Hash,
1022    Arbitrary
1023)]
1024pub struct SchemaV1 {
1025    pub id: String,
1026    pub name: String,
1027    pub database_name: String,
1028}
1029
1030#[derive(
1031    Clone,
1032    Debug,
1033    Serialize,
1034    Deserialize,
1035    PartialOrd,
1036    PartialEq,
1037    Eq,
1038    Ord,
1039    Hash,
1040    Arbitrary
1041)]
1042pub struct SchemaV2 {
1043    pub id: String,
1044    pub name: String,
1045    pub database_name: Option<String>,
1046}
1047
1048#[derive(
1049    Clone,
1050    Debug,
1051    Serialize,
1052    Deserialize,
1053    PartialOrd,
1054    PartialEq,
1055    Eq,
1056    Ord,
1057    Hash,
1058    Arbitrary
1059)]
1060pub struct RenameSchemaV1 {
1061    pub id: String,
1062    pub database_name: Option<String>,
1063    pub old_name: String,
1064    pub new_name: String,
1065}
1066
1067#[derive(
1068    Clone,
1069    Debug,
1070    Serialize,
1071    Deserialize,
1072    PartialOrd,
1073    PartialEq,
1074    Eq,
1075    Ord,
1076    Hash,
1077    Arbitrary
1078)]
1079pub struct AlterRetainHistoryV1 {
1080    pub id: String,
1081    pub old_history: Option<String>,
1082    pub new_history: Option<String>,
1083}
1084
1085#[derive(
1086    Clone,
1087    Debug,
1088    Serialize,
1089    Deserialize,
1090    PartialOrd,
1091    PartialEq,
1092    Eq,
1093    Ord,
1094    Hash,
1095    Arbitrary
1096)]
1097pub struct AlterAddColumnV1 {
1098    pub id: String,
1099    pub column: String,
1100    pub column_type: String,
1101    pub nullable: bool,
1102}
1103
1104#[derive(
1105    Clone,
1106    Debug,
1107    Serialize,
1108    Deserialize,
1109    PartialOrd,
1110    PartialEq,
1111    Eq,
1112    Ord,
1113    Hash,
1114    Arbitrary
1115)]
1116pub struct AlterSourceTimestampIntervalV1 {
1117    pub id: String,
1118    pub old_interval: Option<String>,
1119    pub new_interval: Option<String>,
1120}
1121
1122#[derive(
1123    Clone,
1124    Debug,
1125    Serialize,
1126    Deserialize,
1127    PartialOrd,
1128    PartialEq,
1129    Eq,
1130    Ord,
1131    Hash,
1132    Arbitrary
1133)]
1134pub struct UpdateItemV1 {
1135    pub id: String,
1136    #[serde(flatten)]
1137    pub name: FullNameV1,
1138}
1139
1140#[derive(
1141    Clone,
1142    Debug,
1143    Serialize,
1144    Deserialize,
1145    PartialOrd,
1146    PartialEq,
1147    Eq,
1148    Ord,
1149    Hash,
1150    Arbitrary
1151)]
1152pub struct ToNewIdV1 {
1153    pub id: String,
1154    pub new_id: String,
1155}
1156
1157#[derive(
1158    Clone,
1159    Debug,
1160    Serialize,
1161    Deserialize,
1162    PartialOrd,
1163    PartialEq,
1164    Eq,
1165    Ord,
1166    Hash,
1167    Arbitrary
1168)]
1169pub struct FromPreviousIdV1 {
1170    pub id: String,
1171    pub previous_id: String,
1172}
1173
1174impl EventDetails {
1175    pub fn as_json(&self) -> serde_json::Value {
1176        match self {
1177            EventDetails::CreateClusterReplicaV1(v) => {
1178                serde_json::to_value(v).expect("must serialize")
1179            }
1180            EventDetails::CreateClusterReplicaV2(v) => {
1181                serde_json::to_value(v).expect("must serialize")
1182            }
1183            EventDetails::CreateClusterReplicaV3(v) => {
1184                serde_json::to_value(v).expect("must serialize")
1185            }
1186            EventDetails::CreateClusterReplicaV4(v) => {
1187                serde_json::to_value(v).expect("must serialize")
1188            }
1189            EventDetails::DropClusterReplicaV1(v) => {
1190                serde_json::to_value(v).expect("must serialize")
1191            }
1192            EventDetails::DropClusterReplicaV2(v) => {
1193                serde_json::to_value(v).expect("must serialize")
1194            }
1195            EventDetails::DropClusterReplicaV3(v) => {
1196                serde_json::to_value(v).expect("must serialize")
1197            }
1198            EventDetails::IdFullNameV1(v) => serde_json::to_value(v).expect("must serialize"),
1199            EventDetails::RenameClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
1200            EventDetails::RenameClusterReplicaV1(v) => {
1201                serde_json::to_value(v).expect("must serialize")
1202            }
1203            EventDetails::RenameItemV1(v) => serde_json::to_value(v).expect("must serialize"),
1204            EventDetails::IdNameV1(v) => serde_json::to_value(v).expect("must serialize"),
1205            EventDetails::SchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
1206            EventDetails::SchemaV2(v) => serde_json::to_value(v).expect("must serialize"),
1207            EventDetails::RenameSchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
1208            EventDetails::CreateSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
1209            EventDetails::CreateSourceSinkV2(v) => serde_json::to_value(v).expect("must serialize"),
1210            EventDetails::CreateSourceSinkV3(v) => serde_json::to_value(v).expect("must serialize"),
1211            EventDetails::CreateSourceSinkV4(v) => serde_json::to_value(v).expect("must serialize"),
1212            EventDetails::CreateIndexV1(v) => serde_json::to_value(v).expect("must serialize"),
1213            EventDetails::CreateMaterializedViewV1(v) => {
1214                serde_json::to_value(v).expect("must serialize")
1215            }
1216            EventDetails::AlterApplyReplacementV1(v) => {
1217                serde_json::to_value(v).expect("must serialize")
1218            }
1219            EventDetails::AlterSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
1220            EventDetails::AlterSetClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
1221            EventDetails::GrantRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1222            EventDetails::GrantRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
1223            EventDetails::RevokeRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1224            EventDetails::RevokeRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
1225            EventDetails::UpdatePrivilegeV1(v) => serde_json::to_value(v).expect("must serialize"),
1226            EventDetails::AlterDefaultPrivilegeV1(v) => {
1227                serde_json::to_value(v).expect("must serialize")
1228            }
1229            EventDetails::UpdateOwnerV1(v) => serde_json::to_value(v).expect("must serialize"),
1230            EventDetails::UpdateItemV1(v) => serde_json::to_value(v).expect("must serialize"),
1231            EventDetails::AlterRetainHistoryV1(v) => {
1232                serde_json::to_value(v).expect("must serialize")
1233            }
1234            EventDetails::AlterAddColumnV1(v) => serde_json::to_value(v).expect("must serialize"),
1235            EventDetails::AlterSourceTimestampIntervalV1(v) => {
1236                serde_json::to_value(v).expect("must serialize")
1237            }
1238            EventDetails::ToNewIdV1(v) => serde_json::to_value(v).expect("must serialize"),
1239            EventDetails::FromPreviousIdV1(v) => serde_json::to_value(v).expect("must serialize"),
1240            EventDetails::SetV1(v) => serde_json::to_value(v).expect("must serialize"),
1241            EventDetails::ResetAllV1 => serde_json::Value::Null,
1242            EventDetails::RotateKeysV1(v) => serde_json::to_value(v).expect("must serialize"),
1243            EventDetails::CreateRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1244        }
1245    }
1246}
1247
1248#[derive(
1249    Clone,
1250    Debug,
1251    Serialize,
1252    Deserialize,
1253    PartialOrd,
1254    PartialEq,
1255    Eq,
1256    Ord,
1257    Hash,
1258    Arbitrary
1259)]
1260pub struct EventV1 {
1261    pub id: u64,
1262    pub event_type: EventType,
1263    pub object_type: ObjectType,
1264    pub details: EventDetails,
1265    pub user: Option<String>,
1266    pub occurred_at: EpochMillis,
1267}
1268
1269impl EventV1 {
1270    fn new(
1271        id: u64,
1272        event_type: EventType,
1273        object_type: ObjectType,
1274        details: EventDetails,
1275        user: Option<String>,
1276        occurred_at: EpochMillis,
1277    ) -> EventV1 {
1278        EventV1 {
1279            id,
1280            event_type,
1281            object_type,
1282            details,
1283            user,
1284            occurred_at,
1285        }
1286    }
1287}
1288
1289#[derive(
1290    Clone,
1291    Debug,
1292    Serialize,
1293    Deserialize,
1294    PartialOrd,
1295    PartialEq,
1296    Eq,
1297    Ord,
1298    Hash,
1299    Arbitrary
1300)]
1301pub struct StorageUsageV1 {
1302    pub id: u64,
1303    pub shard_id: Option<String>,
1304    pub size_bytes: u64,
1305    pub collection_timestamp: EpochMillis,
1306}
1307
1308impl StorageUsageV1 {
1309    pub fn new(
1310        id: u64,
1311        shard_id: Option<String>,
1312        size_bytes: u64,
1313        collection_timestamp: EpochMillis,
1314    ) -> StorageUsageV1 {
1315        StorageUsageV1 {
1316            id,
1317            shard_id,
1318            size_bytes,
1319            collection_timestamp,
1320        }
1321    }
1322}
1323
1324/// Describes the environment's storage usage at a point in time.
1325///
1326/// This type is persisted in the catalog across restarts, so any updates to the
1327/// schema will require a new version.
1328#[derive(
1329    Clone,
1330    Debug,
1331    Serialize,
1332    Deserialize,
1333    PartialOrd,
1334    PartialEq,
1335    Eq,
1336    Ord,
1337    Hash,
1338    Arbitrary
1339)]
1340pub enum VersionedStorageUsage {
1341    V1(StorageUsageV1),
1342}
1343
1344impl VersionedStorageUsage {
1345    /// Create a new metric snapshot.
1346    /// This function must always require and produce the most
1347    /// recent variant of VersionedStorageMetrics.
1348    pub fn new(
1349        id: u64,
1350        object_id: Option<String>,
1351        size_bytes: u64,
1352        collection_timestamp: EpochMillis,
1353    ) -> Self {
1354        Self::V1(StorageUsageV1::new(
1355            id,
1356            object_id,
1357            size_bytes,
1358            collection_timestamp,
1359        ))
1360    }
1361
1362    // Implement deserialize and serialize so writers and readers don't have to
1363    // coordinate about which Serializer to use.
1364    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
1365        Ok(serde_json::from_slice(data)?)
1366    }
1367
1368    pub fn serialize(&self) -> Vec<u8> {
1369        serde_json::to_vec(self).expect("must serialize")
1370    }
1371
1372    pub fn timestamp(&self) -> EpochMillis {
1373        match self {
1374            VersionedStorageUsage::V1(StorageUsageV1 {
1375                collection_timestamp,
1376                ..
1377            }) => *collection_timestamp,
1378        }
1379    }
1380
1381    /// Returns a globally sortable event order. All event versions must have this
1382    /// field.
1383    pub fn sortable_id(&self) -> u64 {
1384        match self {
1385            VersionedStorageUsage::V1(usage) => usage.id,
1386        }
1387    }
1388}
1389
1390#[cfg(test)]
1391mod tests {
1392    use crate::{EventDetails, EventType, EventV1, IdNameV1, ObjectType, VersionedEvent};
1393
1394    // Test all versions of events. This test hard codes bytes so that
1395    // programmers are not able to change data structures here without this test
1396    // failing. Instead of changing data structures, add new variants.
1397    #[mz_ore::test]
1398    fn test_audit_log() -> Result<(), anyhow::Error> {
1399        let cases: Vec<(VersionedEvent, &'static str)> = vec![(
1400            VersionedEvent::V1(EventV1::new(
1401                2,
1402                EventType::Drop,
1403                ObjectType::ClusterReplica,
1404                EventDetails::IdNameV1(IdNameV1 {
1405                    id: "u1".to_string(),
1406                    name: "name".into(),
1407                }),
1408                None,
1409                2,
1410            )),
1411            r#"{"V1":{"id":2,"event_type":"drop","object_type":"cluster-replica","details":{"IdNameV1":{"id":"u1","name":"name"}},"user":null,"occurred_at":2}}"#,
1412        )];
1413
1414        for (event, expected_bytes) in cases {
1415            let event_bytes = serde_json::to_vec(&event).unwrap();
1416            assert_eq!(
1417                event_bytes,
1418                expected_bytes.as_bytes(),
1419                "expected bytes {}, got {}",
1420                expected_bytes,
1421                std::str::from_utf8(&event_bytes).unwrap(),
1422            );
1423        }
1424
1425        Ok(())
1426    }
1427}