Skip to main content

mz_audit_log/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Audit log data structures.
11//!
12//! The audit log is logging that is produced by user actions and consumed
13//! by users in the form of the `mz_catalog.mz_audit_events` SQL table and
14//! by the cloud management layer for billing and introspection. This crate
15//! is designed to make the production and consumption of the logs type
16//! safe. Events and their metadata are versioned and the data structures
17//! replicated here so that if the data change in some other crate, a
18//! new version here can be made. This avoids needing to poke at the data
19//! when reading it to determine what it means and should have full backward
20//! compatibility. This is its own crate so that production and consumption can
21//! be in different processes and production is not allowed to specify private
22//! data structures unknown to the reader.
23
24use std::time::Duration;
25
26use mz_ore::now::EpochMillis;
27use proptest_derive::Arbitrary;
28use serde::{Deserialize, Serialize};
29
30/// New version variants should be added if fields need to be added, changed, or removed.
31#[derive(
32    Clone,
33    Debug,
34    Serialize,
35    Deserialize,
36    PartialOrd,
37    PartialEq,
38    Eq,
39    Ord,
40    Hash,
41    Arbitrary
42)]
43pub enum VersionedEvent {
44    V1(EventV1),
45}
46
47impl VersionedEvent {
48    /// Create a new event. This function must always require and produce the most
49    /// recent variant of VersionedEvent. `id` must be a globally increasing,
50    /// ordered number such that sorting by it on all events yields the order
51    /// of events by users. It is insufficient to use `occurred_at` (even at
52    /// nanosecond precision) due to clock unpredictability.
53    pub fn new(
54        id: u64,
55        event_type: EventType,
56        object_type: ObjectType,
57        details: EventDetails,
58        user: Option<String>,
59        occurred_at: EpochMillis,
60    ) -> Self {
61        Self::V1(EventV1::new(
62            id,
63            event_type,
64            object_type,
65            details,
66            user,
67            occurred_at,
68        ))
69    }
70
71    // Implement deserialize and serialize so writers and readers don't have to
72    // coordinate about which Serializer to use.
73    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
74        Ok(serde_json::from_slice(data)?)
75    }
76
77    pub fn serialize(&self) -> Vec<u8> {
78        serde_json::to_vec(self).expect("must serialize")
79    }
80
81    /// Returns a globally sortable event order. All event versions must have this
82    /// field.
83    pub fn sortable_id(&self) -> u64 {
84        match self {
85            VersionedEvent::V1(ev) => ev.id,
86        }
87    }
88}
89
90#[derive(
91    Clone,
92    Debug,
93    Serialize,
94    Deserialize,
95    PartialOrd,
96    PartialEq,
97    Eq,
98    Ord,
99    Hash,
100    Arbitrary
101)]
102#[serde(rename_all = "kebab-case")]
103pub enum EventType {
104    Create,
105    Drop,
106    Alter,
107    Grant,
108    Revoke,
109    Comment,
110}
111
112impl EventType {
113    pub fn as_title_case(&self) -> &'static str {
114        match self {
115            EventType::Create => "Created",
116            EventType::Drop => "Dropped",
117            EventType::Alter => "Altered",
118            EventType::Grant => "Granted",
119            EventType::Revoke => "Revoked",
120            EventType::Comment => "Comment",
121        }
122    }
123}
124
125serde_plain::derive_display_from_serialize!(EventType);
126
127#[derive(
128    Clone,
129    Copy,
130    Debug,
131    Serialize,
132    Deserialize,
133    PartialOrd,
134    PartialEq,
135    Eq,
136    Ord,
137    Hash,
138    Arbitrary
139)]
140#[serde(rename_all = "kebab-case")]
141pub enum ObjectType {
142    Cluster,
143    ClusterReplica,
144    Connection,
145    ContinualTask,
146    Database,
147    Func,
148    Index,
149    MaterializedView,
150    NetworkPolicy,
151    Role,
152    Secret,
153    Schema,
154    Sink,
155    Source,
156    System,
157    Table,
158    Type,
159    View,
160}
161
162impl ObjectType {
163    pub fn as_title_case(&self) -> &'static str {
164        match self {
165            ObjectType::Cluster => "Cluster",
166            ObjectType::ClusterReplica => "Cluster Replica",
167            ObjectType::Connection => "Connection",
168            ObjectType::ContinualTask => "Continual Task",
169            ObjectType::Database => "Database",
170            ObjectType::Func => "Function",
171            ObjectType::Index => "Index",
172            ObjectType::MaterializedView => "Materialized View",
173            ObjectType::NetworkPolicy => "Network Policy",
174            ObjectType::Role => "Role",
175            ObjectType::Schema => "Schema",
176            ObjectType::Secret => "Secret",
177            ObjectType::Sink => "Sink",
178            ObjectType::Source => "Source",
179            ObjectType::System => "System",
180            ObjectType::Table => "Table",
181            ObjectType::Type => "Type",
182            ObjectType::View => "View",
183        }
184    }
185}
186
187serde_plain::derive_display_from_serialize!(ObjectType);
188
189#[derive(
190    Clone,
191    Debug,
192    Serialize,
193    Deserialize,
194    PartialOrd,
195    PartialEq,
196    Eq,
197    Ord,
198    Hash,
199    Arbitrary
200)]
201pub enum EventDetails {
202    #[serde(rename = "CreateComputeReplicaV1")] // historical name
203    CreateClusterReplicaV1(CreateClusterReplicaV1),
204    CreateClusterReplicaV2(CreateClusterReplicaV2),
205    CreateClusterReplicaV3(CreateClusterReplicaV3),
206    CreateClusterReplicaV4(CreateClusterReplicaV4),
207    #[serde(rename = "DropComputeReplicaV1")] // historical name
208    DropClusterReplicaV1(DropClusterReplicaV1),
209    DropClusterReplicaV2(DropClusterReplicaV2),
210    DropClusterReplicaV3(DropClusterReplicaV3),
211    CreateSourceSinkV1(CreateSourceSinkV1),
212    CreateSourceSinkV2(CreateSourceSinkV2),
213    CreateSourceSinkV3(CreateSourceSinkV3),
214    CreateSourceSinkV4(CreateSourceSinkV4),
215    CreateIndexV1(CreateIndexV1),
216    CreateMaterializedViewV1(CreateMaterializedViewV1),
217    AlterApplyReplacementV1(AlterApplyReplacementV1),
218    AlterSetClusterV1(AlterSetClusterV1),
219    AlterSourceSinkV1(AlterSourceSinkV1),
220    GrantRoleV1(GrantRoleV1),
221    GrantRoleV2(GrantRoleV2),
222    RevokeRoleV1(RevokeRoleV1),
223    RevokeRoleV2(RevokeRoleV2),
224    UpdatePrivilegeV1(UpdatePrivilegeV1),
225    AlterDefaultPrivilegeV1(AlterDefaultPrivilegeV1),
226    UpdateOwnerV1(UpdateOwnerV1),
227    IdFullNameV1(IdFullNameV1),
228    RenameClusterV1(RenameClusterV1),
229    RenameClusterReplicaV1(RenameClusterReplicaV1),
230    AlterClusterReconfigurationV1(AlterClusterReconfigurationV1),
231    ClusterHydrationBurstV1(ClusterHydrationBurstV1),
232    RenameItemV1(RenameItemV1),
233    IdNameV1(IdNameV1),
234    SchemaV1(SchemaV1),
235    SchemaV2(SchemaV2),
236    UpdateItemV1(UpdateItemV1),
237    RenameSchemaV1(RenameSchemaV1),
238    AlterRetainHistoryV1(AlterRetainHistoryV1),
239    AlterAddColumnV1(AlterAddColumnV1),
240    AlterSourceTimestampIntervalV1(AlterSourceTimestampIntervalV1),
241    ToNewIdV1(ToNewIdV1),
242    FromPreviousIdV1(FromPreviousIdV1),
243    SetV1(SetV1),
244    ResetAllV1,
245    RotateKeysV1(RotateKeysV1),
246    CreateRoleV1(CreateRoleV1),
247}
248
249#[derive(
250    Clone,
251    Debug,
252    Serialize,
253    Deserialize,
254    PartialOrd,
255    PartialEq,
256    Eq,
257    Ord,
258    Hash,
259    Arbitrary
260)]
261pub struct SetV1 {
262    pub name: String,
263    pub value: Option<String>,
264}
265
266#[derive(
267    Clone,
268    Debug,
269    Serialize,
270    Deserialize,
271    PartialOrd,
272    PartialEq,
273    Eq,
274    Ord,
275    Hash,
276    Arbitrary
277)]
278pub struct RotateKeysV1 {
279    pub id: String,
280    pub name: String,
281}
282
283#[derive(
284    Clone,
285    Debug,
286    Serialize,
287    Deserialize,
288    PartialOrd,
289    PartialEq,
290    Eq,
291    Ord,
292    Hash,
293    Arbitrary
294)]
295pub struct IdFullNameV1 {
296    pub id: String,
297    #[serde(flatten)]
298    pub name: FullNameV1,
299}
300
301#[derive(
302    Clone,
303    Debug,
304    Serialize,
305    Deserialize,
306    PartialOrd,
307    PartialEq,
308    Eq,
309    Ord,
310    Hash,
311    Arbitrary
312)]
313pub struct FullNameV1 {
314    pub database: String,
315    pub schema: String,
316    pub item: String,
317}
318
319#[derive(
320    Clone,
321    Debug,
322    Serialize,
323    Deserialize,
324    PartialOrd,
325    PartialEq,
326    Eq,
327    Ord,
328    Hash,
329    Arbitrary
330)]
331pub struct IdNameV1 {
332    pub id: String,
333    pub name: String,
334}
335
336/// A transition in the lifecycle of a background cluster reconfiguration (a
337/// `reconfiguration` record on a managed cluster), recorded so an operator can
338/// trace a background `ALTER CLUSTER` from start to its resolution.
339///
340/// The replica creates and drops the reconfiguration induces are recorded
341/// separately, carrying [`CreateOrDropClusterReplicaReasonV1::Reconfiguration`];
342/// this event family records the cluster-level transitions those replica
343/// lifecycle events hang off of.
344#[derive(
345    Clone,
346    Debug,
347    Serialize,
348    Deserialize,
349    PartialOrd,
350    PartialEq,
351    Eq,
352    Ord,
353    Hash,
354    Arbitrary
355)]
356#[serde(rename_all = "kebab-case")]
357pub enum ReconfigurationLifecycleV1 {
358    /// A reconfiguration record was written or re-targeted: the cluster is now
359    /// converging onto a new target shape.
360    Started,
361    /// The realized config cut over to the target and the record was cleared:
362    /// a hydrated success under either `ON TIMEOUT` action (including one that
363    /// hydrated after the deadline, since success takes precedence) or a
364    /// forced `ON TIMEOUT COMMIT` cut-over of a not-yet-hydrated target past
365    /// the deadline. The event carries the record's deadline; comparing it to
366    /// the event's occurrence time tells an in-time cut-over from a late or
367    /// forced one.
368    Finalized,
369    /// The deadline fired with the target not hydrated under `ON TIMEOUT
370    /// ROLLBACK`: the record was cleared with the realized config untouched and
371    /// the target replicas dropped, reverting to the pre-reconfiguration set.
372    /// Emitted exactly once per timeout. The clear is durable, so the
373    /// transition cannot re-fire. This event is the timeout's papertrail; the
374    /// record (and with it the abandoned target) is gone from the catalog.
375    TimedOut,
376    /// An in-flight reconfiguration was cancelled by re-targeting the record
377    /// back to the cluster's still-realized shape (the ALTER-back cancel path);
378    /// the controller drops the in-flight target replicas and clears the record.
379    Cancelled,
380}
381
382/// A cluster-level transition in a background reconfiguration's lifecycle.
383///
384/// `deadline` is the reconfiguration's active deadline as a millisecond
385/// `mz_timestamp`, recorded on every transition so an operator can correlate
386/// the transition with the originating `ALTER`: on `started` and `cancelled`
387/// the written/re-targeted record's deadline, on `timed-out` and `finalized`
388/// the just-cleared record's. On `finalized`, comparing it to the event's
389/// occurrence time distinguishes an in-time cut-over from a late or forced
390/// (`ON TIMEOUT COMMIT`) one.
391#[derive(
392    Clone,
393    Debug,
394    Serialize,
395    Deserialize,
396    PartialOrd,
397    PartialEq,
398    Eq,
399    Ord,
400    Hash,
401    Arbitrary
402)]
403pub struct AlterClusterReconfigurationV1 {
404    pub cluster_id: String,
405    pub cluster_name: String,
406    pub transition: ReconfigurationLifecycleV1,
407    pub target_size: String,
408    pub target_replication_factor: u32,
409    pub target_availability_zones: Vec<String>,
410    pub target_logging: ClusterReplicaLoggingV1,
411    pub deadline: Option<u64>,
412}
413
414/// A managed cluster's introspection-logging config, recorded on a
415/// reconfiguration event so the papertrail captures an introspection-only
416/// `ALTER` (which otherwise leaves `target_size` and
417/// `target_replication_factor` unchanged from the realized shape). Mirrors the
418/// durable `ReplicaLogging`: `log_logging` is `INTROSPECTION DEBUGGING`,
419/// `interval` is `INTROSPECTION INTERVAL` (`None` disables introspection).
420#[derive(
421    Clone,
422    Debug,
423    Serialize,
424    Deserialize,
425    PartialOrd,
426    PartialEq,
427    Eq,
428    Ord,
429    Hash,
430    Arbitrary
431)]
432pub struct ClusterReplicaLoggingV1 {
433    pub log_logging: bool,
434    pub interval: Option<Duration>,
435}
436
437#[derive(
438    Clone,
439    Debug,
440    Serialize,
441    Deserialize,
442    PartialOrd,
443    PartialEq,
444    Eq,
445    Ord,
446    Hash,
447    Arbitrary
448)]
449pub struct CreateRoleV1 {
450    pub id: String,
451    pub name: String,
452    pub auto_provision_source: Option<String>,
453}
454
455/// A transition in the lifecycle of a hydration burst (a `burst` record on a
456/// managed cluster), recorded so an operator can trace a controller-initiated
457/// burst from start to teardown.
458///
459/// The burst replica's create and drop are recorded separately, carrying
460/// [`CreateOrDropClusterReplicaReasonV1::HydrationBurst`]; this event family
461/// records the cluster-level transitions those replica lifecycle events hang off.
462#[derive(
463    Clone,
464    Debug,
465    Serialize,
466    Deserialize,
467    PartialOrd,
468    PartialEq,
469    Eq,
470    Ord,
471    Hash,
472    Arbitrary
473)]
474#[serde(rename_all = "kebab-case")]
475pub enum HydrationBurstLifecycleV1 {
476    /// A `burst` record was written: the controller is now running a burst
477    /// replica to accelerate hydration.
478    Started,
479    /// The `burst` record was cleared: the burst replica is torn down (its linger
480    /// elapsed after the steady set hydrated, or the burst is no longer warranted).
481    Finished,
482}
483
484/// A cluster-level transition in a hydration burst's lifecycle.
485#[derive(
486    Clone,
487    Debug,
488    Serialize,
489    Deserialize,
490    PartialOrd,
491    PartialEq,
492    Eq,
493    Ord,
494    Hash,
495    Arbitrary
496)]
497pub struct ClusterHydrationBurstV1 {
498    pub cluster_id: String,
499    pub cluster_name: String,
500    pub transition: HydrationBurstLifecycleV1,
501    /// The size of the burst replica the record runs.
502    pub burst_size: String,
503}
504
505#[derive(
506    Clone,
507    Debug,
508    Serialize,
509    Deserialize,
510    PartialOrd,
511    PartialEq,
512    Eq,
513    Ord,
514    Hash,
515    Arbitrary
516)]
517pub struct RenameItemV1 {
518    pub id: String,
519    pub old_name: FullNameV1,
520    pub new_name: FullNameV1,
521}
522
523#[derive(
524    Clone,
525    Debug,
526    Serialize,
527    Deserialize,
528    PartialOrd,
529    PartialEq,
530    Eq,
531    Ord,
532    Hash,
533    Arbitrary
534)]
535pub struct RenameClusterV1 {
536    pub id: String,
537    pub old_name: String,
538    pub new_name: String,
539}
540
541#[derive(
542    Clone,
543    Debug,
544    Serialize,
545    Deserialize,
546    PartialOrd,
547    PartialEq,
548    Eq,
549    Ord,
550    Hash,
551    Arbitrary
552)]
553pub struct RenameClusterReplicaV1 {
554    pub cluster_id: String,
555    pub replica_id: String,
556    pub old_name: String,
557    pub new_name: String,
558}
559
560#[derive(
561    Clone,
562    Debug,
563    Serialize,
564    Deserialize,
565    PartialOrd,
566    PartialEq,
567    Eq,
568    Ord,
569    Hash,
570    Arbitrary
571)]
572pub struct DropClusterReplicaV1 {
573    pub cluster_id: String,
574    pub cluster_name: String,
575    // Events that predate v0.32.0 will not have this field set.
576    #[serde(skip_serializing_if = "Option::is_none")]
577    pub replica_id: Option<String>,
578    pub replica_name: String,
579}
580
581#[derive(
582    Clone,
583    Debug,
584    Serialize,
585    Deserialize,
586    PartialOrd,
587    PartialEq,
588    Eq,
589    Ord,
590    Hash,
591    Arbitrary
592)]
593pub struct DropClusterReplicaV2 {
594    pub cluster_id: String,
595    pub cluster_name: String,
596    pub replica_id: Option<String>,
597    pub replica_name: String,
598    pub reason: CreateOrDropClusterReplicaReasonV1,
599    #[serde(skip_serializing_if = "Option::is_none")]
600    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
601}
602
603#[derive(
604    Clone,
605    Debug,
606    Serialize,
607    Deserialize,
608    PartialOrd,
609    PartialEq,
610    Eq,
611    Ord,
612    Hash,
613    Arbitrary
614)]
615pub struct DropClusterReplicaV3 {
616    pub cluster_id: String,
617    pub cluster_name: String,
618    pub replica_id: Option<String>,
619    pub replica_name: String,
620    pub reason: CreateOrDropClusterReplicaReasonV1,
621    #[serde(skip_serializing_if = "Option::is_none")]
622    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
623}
624
625#[derive(
626    Clone,
627    Debug,
628    Serialize,
629    Deserialize,
630    PartialOrd,
631    PartialEq,
632    Eq,
633    Ord,
634    Hash,
635    Arbitrary
636)]
637pub struct CreateClusterReplicaV1 {
638    pub cluster_id: String,
639    pub cluster_name: String,
640    // Events that predate v0.32.0 will not have this field set.
641    #[serde(skip_serializing_if = "Option::is_none")]
642    pub replica_id: Option<String>,
643    pub replica_name: String,
644    pub logical_size: String,
645    pub disk: bool,
646    pub billed_as: Option<String>,
647    pub internal: bool,
648}
649
650#[derive(
651    Clone,
652    Debug,
653    Serialize,
654    Deserialize,
655    PartialOrd,
656    PartialEq,
657    Eq,
658    Ord,
659    Hash,
660    Arbitrary
661)]
662pub struct CreateClusterReplicaV2 {
663    pub cluster_id: String,
664    pub cluster_name: String,
665    pub replica_id: Option<String>,
666    pub replica_name: String,
667    pub logical_size: String,
668    pub disk: bool,
669    pub billed_as: Option<String>,
670    pub internal: bool,
671    pub reason: CreateOrDropClusterReplicaReasonV1,
672    #[serde(skip_serializing_if = "Option::is_none")]
673    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV1>,
674}
675
676#[derive(
677    Clone,
678    Debug,
679    Serialize,
680    Deserialize,
681    PartialOrd,
682    PartialEq,
683    Eq,
684    Ord,
685    Hash,
686    Arbitrary
687)]
688pub struct CreateClusterReplicaV3 {
689    pub cluster_id: String,
690    pub cluster_name: String,
691    pub replica_id: Option<String>,
692    pub replica_name: String,
693    pub logical_size: String,
694    pub disk: bool,
695    pub billed_as: Option<String>,
696    pub internal: bool,
697    pub reason: CreateOrDropClusterReplicaReasonV1,
698    #[serde(skip_serializing_if = "Option::is_none")]
699    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
700}
701
702#[derive(
703    Clone,
704    Debug,
705    Serialize,
706    Deserialize,
707    PartialOrd,
708    PartialEq,
709    Eq,
710    Ord,
711    Hash,
712    Arbitrary
713)]
714pub struct CreateClusterReplicaV4 {
715    pub cluster_id: String,
716    pub cluster_name: String,
717    pub replica_id: Option<String>,
718    pub replica_name: String,
719    pub logical_size: String,
720    pub billed_as: Option<String>,
721    pub internal: bool,
722    pub reason: CreateOrDropClusterReplicaReasonV1,
723    #[serde(skip_serializing_if = "Option::is_none")]
724    pub scheduling_policies: Option<SchedulingDecisionsWithReasonsV2>,
725}
726
727#[derive(
728    Clone,
729    Debug,
730    Serialize,
731    Deserialize,
732    PartialOrd,
733    PartialEq,
734    Eq,
735    Ord,
736    Hash,
737    Arbitrary
738)]
739#[serde(rename_all = "kebab-case")]
740pub enum CreateOrDropClusterReplicaReasonV1 {
741    Manual,
742    Schedule,
743    System,
744    /// The cluster controller's graceful-reconfiguration strategy created the
745    /// replica while converging a cluster onto an in-flight `reconfiguration`
746    /// target (a background `ALTER CLUSTER`).
747    Reconfiguration,
748    /// The cluster controller's hydration-burst strategy created the
749    /// transient burst replica it runs while a cluster's objects are not yet
750    /// hydrated.
751    HydrationBurst,
752    /// The cluster controller dropped the replica because the cluster's
753    /// configuration no longer calls for it. NOTE: a replication-factor
754    /// decrease drop reads `retired` even though the config change itself was
755    /// user-initiated.
756    Retired,
757}
758
759/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
760/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
761/// can be settings of the policy as well as other information about the state of the system.)
762#[derive(
763    Clone,
764    Debug,
765    Serialize,
766    Deserialize,
767    PartialOrd,
768    PartialEq,
769    Eq,
770    Ord,
771    Hash,
772    Arbitrary
773)]
774pub struct SchedulingDecisionsWithReasonsV1 {
775    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
776    pub on_refresh: RefreshDecisionWithReasonV1,
777}
778
779/// The reason for the automated cluster scheduling to turn a cluster On or Off. Each existing
780/// policy's On/Off opinion should be recorded, along with their reasons. (Among the reasons there
781/// can be settings of the policy as well as other information about the state of the system.)
782#[derive(
783    Clone,
784    Debug,
785    Serialize,
786    Deserialize,
787    PartialOrd,
788    PartialEq,
789    Eq,
790    Ord,
791    Hash,
792    Arbitrary
793)]
794pub struct SchedulingDecisionsWithReasonsV2 {
795    /// The reason for the refresh policy for wanting to turn a cluster On or Off.
796    pub on_refresh: RefreshDecisionWithReasonV2,
797}
798
799#[derive(
800    Clone,
801    Debug,
802    Serialize,
803    Deserialize,
804    PartialOrd,
805    PartialEq,
806    Eq,
807    Ord,
808    Hash,
809    Arbitrary
810)]
811pub struct RefreshDecisionWithReasonV1 {
812    pub decision: SchedulingDecisionV1,
813    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
814    /// time estimate).
815    pub objects_needing_refresh: Vec<String>,
816    /// The HYDRATION TIME ESTIMATE setting of the cluster.
817    pub hydration_time_estimate: String,
818}
819
820#[derive(
821    Clone,
822    Debug,
823    Serialize,
824    Deserialize,
825    PartialOrd,
826    PartialEq,
827    Eq,
828    Ord,
829    Hash,
830    Arbitrary
831)]
832pub struct RefreshDecisionWithReasonV2 {
833    pub decision: SchedulingDecisionV1,
834    /// Objects that currently need a refresh on the cluster (taking into account the rehydration
835    /// time estimate), and therefore should keep the cluster On.
836    pub objects_needing_refresh: Vec<String>,
837    /// Objects for which we estimate that they currently need Persist compaction, and therefore
838    /// should keep the cluster On.
839    pub objects_needing_compaction: Vec<String>,
840    /// The HYDRATION TIME ESTIMATE setting of the cluster.
841    pub hydration_time_estimate: String,
842}
843
844#[derive(
845    Clone,
846    Debug,
847    Serialize,
848    Deserialize,
849    PartialOrd,
850    PartialEq,
851    Eq,
852    Ord,
853    Hash,
854    Arbitrary
855)]
856#[serde(rename_all = "kebab-case")]
857pub enum SchedulingDecisionV1 {
858    On,
859    Off,
860}
861
862impl From<bool> for SchedulingDecisionV1 {
863    fn from(value: bool) -> Self {
864        match value {
865            true => SchedulingDecisionV1::On,
866            false => SchedulingDecisionV1::Off,
867        }
868    }
869}
870
871#[derive(
872    Clone,
873    Debug,
874    Serialize,
875    Deserialize,
876    PartialOrd,
877    PartialEq,
878    Eq,
879    Ord,
880    Hash,
881    Arbitrary
882)]
883pub struct CreateSourceSinkV1 {
884    pub id: String,
885    #[serde(flatten)]
886    pub name: FullNameV1,
887    pub size: Option<String>,
888}
889
890#[derive(
891    Clone,
892    Debug,
893    Serialize,
894    Deserialize,
895    PartialOrd,
896    PartialEq,
897    Eq,
898    Ord,
899    Hash,
900    Arbitrary
901)]
902pub struct CreateSourceSinkV2 {
903    pub id: String,
904    #[serde(flatten)]
905    pub name: FullNameV1,
906    pub size: Option<String>,
907    #[serde(rename = "type")]
908    pub external_type: String,
909}
910
911#[derive(
912    Clone,
913    Debug,
914    Serialize,
915    Deserialize,
916    PartialOrd,
917    PartialEq,
918    Eq,
919    Ord,
920    Hash,
921    Arbitrary
922)]
923pub struct CreateSourceSinkV3 {
924    pub id: String,
925    #[serde(flatten)]
926    pub name: FullNameV1,
927    #[serde(rename = "type")]
928    pub external_type: String,
929}
930
931#[derive(
932    Clone,
933    Debug,
934    Serialize,
935    Deserialize,
936    PartialOrd,
937    PartialEq,
938    Eq,
939    Ord,
940    Hash,
941    Arbitrary
942)]
943pub struct CreateSourceSinkV4 {
944    pub id: String,
945    pub cluster_id: Option<String>,
946    #[serde(flatten)]
947    pub name: FullNameV1,
948    #[serde(rename = "type")]
949    pub external_type: String,
950}
951
952#[derive(
953    Clone,
954    Debug,
955    Serialize,
956    Deserialize,
957    PartialOrd,
958    PartialEq,
959    Eq,
960    Ord,
961    Hash,
962    Arbitrary
963)]
964pub struct CreateIndexV1 {
965    pub id: String,
966    pub cluster_id: String,
967    #[serde(flatten)]
968    pub name: FullNameV1,
969}
970
971#[derive(
972    Clone,
973    Debug,
974    Serialize,
975    Deserialize,
976    PartialOrd,
977    PartialEq,
978    Eq,
979    Ord,
980    Hash,
981    Arbitrary
982)]
983pub struct CreateMaterializedViewV1 {
984    pub id: String,
985    pub cluster_id: String,
986    #[serde(flatten)]
987    pub name: FullNameV1,
988    #[serde(skip_serializing_if = "Option::is_none")]
989    pub replacement_target_id: Option<String>,
990}
991
992#[derive(
993    Clone,
994    Debug,
995    Serialize,
996    Deserialize,
997    PartialOrd,
998    PartialEq,
999    Eq,
1000    Ord,
1001    Hash,
1002    Arbitrary
1003)]
1004pub struct AlterApplyReplacementV1 {
1005    #[serde(flatten)]
1006    pub target: IdFullNameV1,
1007    pub replacement: IdFullNameV1,
1008}
1009
1010#[derive(
1011    Clone,
1012    Debug,
1013    Serialize,
1014    Deserialize,
1015    PartialOrd,
1016    PartialEq,
1017    Eq,
1018    Ord,
1019    Hash,
1020    Arbitrary
1021)]
1022pub struct AlterSourceSinkV1 {
1023    pub id: String,
1024    #[serde(flatten)]
1025    pub name: FullNameV1,
1026    pub old_size: Option<String>,
1027    pub new_size: Option<String>,
1028}
1029
1030#[derive(
1031    Clone,
1032    Debug,
1033    Serialize,
1034    Deserialize,
1035    PartialOrd,
1036    PartialEq,
1037    Eq,
1038    Ord,
1039    Hash,
1040    Arbitrary
1041)]
1042pub struct AlterSetClusterV1 {
1043    pub id: String,
1044    #[serde(flatten)]
1045    pub name: FullNameV1,
1046    pub old_cluster_id: String,
1047    pub new_cluster_id: String,
1048}
1049
1050#[derive(
1051    Clone,
1052    Debug,
1053    Serialize,
1054    Deserialize,
1055    PartialOrd,
1056    PartialEq,
1057    Eq,
1058    Ord,
1059    Hash,
1060    Arbitrary
1061)]
1062pub struct GrantRoleV1 {
1063    pub role_id: String,
1064    pub member_id: String,
1065    pub grantor_id: String,
1066}
1067
1068#[derive(
1069    Clone,
1070    Debug,
1071    Serialize,
1072    Deserialize,
1073    PartialOrd,
1074    PartialEq,
1075    Eq,
1076    Ord,
1077    Hash,
1078    Arbitrary
1079)]
1080pub struct GrantRoleV2 {
1081    pub role_id: String,
1082    pub member_id: String,
1083    pub grantor_id: String,
1084    pub executed_by: String,
1085}
1086
1087#[derive(
1088    Clone,
1089    Debug,
1090    Serialize,
1091    Deserialize,
1092    PartialOrd,
1093    PartialEq,
1094    Eq,
1095    Ord,
1096    Hash,
1097    Arbitrary
1098)]
1099pub struct RevokeRoleV1 {
1100    pub role_id: String,
1101    pub member_id: String,
1102}
1103
1104#[derive(
1105    Clone,
1106    Debug,
1107    Serialize,
1108    Deserialize,
1109    PartialOrd,
1110    PartialEq,
1111    Eq,
1112    Ord,
1113    Hash,
1114    Arbitrary
1115)]
1116pub struct RevokeRoleV2 {
1117    pub role_id: String,
1118    pub member_id: String,
1119    pub grantor_id: String,
1120    pub executed_by: String,
1121}
1122
1123#[derive(
1124    Clone,
1125    Debug,
1126    Serialize,
1127    Deserialize,
1128    PartialOrd,
1129    PartialEq,
1130    Eq,
1131    Ord,
1132    Hash,
1133    Arbitrary
1134)]
1135pub struct UpdatePrivilegeV1 {
1136    pub object_id: String,
1137    pub grantee_id: String,
1138    pub grantor_id: String,
1139    pub privileges: String,
1140}
1141
1142#[derive(
1143    Clone,
1144    Debug,
1145    Serialize,
1146    Deserialize,
1147    PartialOrd,
1148    PartialEq,
1149    Eq,
1150    Ord,
1151    Hash,
1152    Arbitrary
1153)]
1154pub struct AlterDefaultPrivilegeV1 {
1155    pub role_id: String,
1156    pub database_id: Option<String>,
1157    pub schema_id: Option<String>,
1158    pub grantee_id: String,
1159    pub privileges: String,
1160}
1161
1162#[derive(
1163    Clone,
1164    Debug,
1165    Serialize,
1166    Deserialize,
1167    PartialOrd,
1168    PartialEq,
1169    Eq,
1170    Ord,
1171    Hash,
1172    Arbitrary
1173)]
1174pub struct UpdateOwnerV1 {
1175    pub object_id: String,
1176    pub old_owner_id: String,
1177    pub new_owner_id: String,
1178}
1179
1180#[derive(
1181    Clone,
1182    Debug,
1183    Serialize,
1184    Deserialize,
1185    PartialOrd,
1186    PartialEq,
1187    Eq,
1188    Ord,
1189    Hash,
1190    Arbitrary
1191)]
1192pub struct SchemaV1 {
1193    pub id: String,
1194    pub name: String,
1195    pub database_name: String,
1196}
1197
1198#[derive(
1199    Clone,
1200    Debug,
1201    Serialize,
1202    Deserialize,
1203    PartialOrd,
1204    PartialEq,
1205    Eq,
1206    Ord,
1207    Hash,
1208    Arbitrary
1209)]
1210pub struct SchemaV2 {
1211    pub id: String,
1212    pub name: String,
1213    pub database_name: Option<String>,
1214}
1215
1216#[derive(
1217    Clone,
1218    Debug,
1219    Serialize,
1220    Deserialize,
1221    PartialOrd,
1222    PartialEq,
1223    Eq,
1224    Ord,
1225    Hash,
1226    Arbitrary
1227)]
1228pub struct RenameSchemaV1 {
1229    pub id: String,
1230    pub database_name: Option<String>,
1231    pub old_name: String,
1232    pub new_name: String,
1233}
1234
1235#[derive(
1236    Clone,
1237    Debug,
1238    Serialize,
1239    Deserialize,
1240    PartialOrd,
1241    PartialEq,
1242    Eq,
1243    Ord,
1244    Hash,
1245    Arbitrary
1246)]
1247pub struct AlterRetainHistoryV1 {
1248    pub id: String,
1249    pub old_history: Option<String>,
1250    pub new_history: Option<String>,
1251}
1252
1253#[derive(
1254    Clone,
1255    Debug,
1256    Serialize,
1257    Deserialize,
1258    PartialOrd,
1259    PartialEq,
1260    Eq,
1261    Ord,
1262    Hash,
1263    Arbitrary
1264)]
1265pub struct AlterAddColumnV1 {
1266    pub id: String,
1267    pub column: String,
1268    pub column_type: String,
1269    pub nullable: bool,
1270}
1271
1272#[derive(
1273    Clone,
1274    Debug,
1275    Serialize,
1276    Deserialize,
1277    PartialOrd,
1278    PartialEq,
1279    Eq,
1280    Ord,
1281    Hash,
1282    Arbitrary
1283)]
1284pub struct AlterSourceTimestampIntervalV1 {
1285    pub id: String,
1286    pub old_interval: Option<String>,
1287    pub new_interval: Option<String>,
1288}
1289
1290#[derive(
1291    Clone,
1292    Debug,
1293    Serialize,
1294    Deserialize,
1295    PartialOrd,
1296    PartialEq,
1297    Eq,
1298    Ord,
1299    Hash,
1300    Arbitrary
1301)]
1302pub struct UpdateItemV1 {
1303    pub id: String,
1304    #[serde(flatten)]
1305    pub name: FullNameV1,
1306}
1307
1308#[derive(
1309    Clone,
1310    Debug,
1311    Serialize,
1312    Deserialize,
1313    PartialOrd,
1314    PartialEq,
1315    Eq,
1316    Ord,
1317    Hash,
1318    Arbitrary
1319)]
1320pub struct ToNewIdV1 {
1321    pub id: String,
1322    pub new_id: String,
1323}
1324
1325#[derive(
1326    Clone,
1327    Debug,
1328    Serialize,
1329    Deserialize,
1330    PartialOrd,
1331    PartialEq,
1332    Eq,
1333    Ord,
1334    Hash,
1335    Arbitrary
1336)]
1337pub struct FromPreviousIdV1 {
1338    pub id: String,
1339    pub previous_id: String,
1340}
1341
1342impl EventDetails {
1343    pub fn as_json(&self) -> serde_json::Value {
1344        match self {
1345            EventDetails::CreateClusterReplicaV1(v) => {
1346                serde_json::to_value(v).expect("must serialize")
1347            }
1348            EventDetails::CreateClusterReplicaV2(v) => {
1349                serde_json::to_value(v).expect("must serialize")
1350            }
1351            EventDetails::CreateClusterReplicaV3(v) => {
1352                serde_json::to_value(v).expect("must serialize")
1353            }
1354            EventDetails::CreateClusterReplicaV4(v) => {
1355                serde_json::to_value(v).expect("must serialize")
1356            }
1357            EventDetails::DropClusterReplicaV1(v) => {
1358                serde_json::to_value(v).expect("must serialize")
1359            }
1360            EventDetails::DropClusterReplicaV2(v) => {
1361                serde_json::to_value(v).expect("must serialize")
1362            }
1363            EventDetails::DropClusterReplicaV3(v) => {
1364                serde_json::to_value(v).expect("must serialize")
1365            }
1366            EventDetails::IdFullNameV1(v) => serde_json::to_value(v).expect("must serialize"),
1367            EventDetails::RenameClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
1368            EventDetails::RenameClusterReplicaV1(v) => {
1369                serde_json::to_value(v).expect("must serialize")
1370            }
1371            EventDetails::AlterClusterReconfigurationV1(v) => {
1372                serde_json::to_value(v).expect("must serialize")
1373            }
1374            EventDetails::ClusterHydrationBurstV1(v) => {
1375                serde_json::to_value(v).expect("must serialize")
1376            }
1377            EventDetails::RenameItemV1(v) => serde_json::to_value(v).expect("must serialize"),
1378            EventDetails::IdNameV1(v) => serde_json::to_value(v).expect("must serialize"),
1379            EventDetails::SchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
1380            EventDetails::SchemaV2(v) => serde_json::to_value(v).expect("must serialize"),
1381            EventDetails::RenameSchemaV1(v) => serde_json::to_value(v).expect("must serialize"),
1382            EventDetails::CreateSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
1383            EventDetails::CreateSourceSinkV2(v) => serde_json::to_value(v).expect("must serialize"),
1384            EventDetails::CreateSourceSinkV3(v) => serde_json::to_value(v).expect("must serialize"),
1385            EventDetails::CreateSourceSinkV4(v) => serde_json::to_value(v).expect("must serialize"),
1386            EventDetails::CreateIndexV1(v) => serde_json::to_value(v).expect("must serialize"),
1387            EventDetails::CreateMaterializedViewV1(v) => {
1388                serde_json::to_value(v).expect("must serialize")
1389            }
1390            EventDetails::AlterApplyReplacementV1(v) => {
1391                serde_json::to_value(v).expect("must serialize")
1392            }
1393            EventDetails::AlterSourceSinkV1(v) => serde_json::to_value(v).expect("must serialize"),
1394            EventDetails::AlterSetClusterV1(v) => serde_json::to_value(v).expect("must serialize"),
1395            EventDetails::GrantRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1396            EventDetails::GrantRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
1397            EventDetails::RevokeRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1398            EventDetails::RevokeRoleV2(v) => serde_json::to_value(v).expect("must serialize"),
1399            EventDetails::UpdatePrivilegeV1(v) => serde_json::to_value(v).expect("must serialize"),
1400            EventDetails::AlterDefaultPrivilegeV1(v) => {
1401                serde_json::to_value(v).expect("must serialize")
1402            }
1403            EventDetails::UpdateOwnerV1(v) => serde_json::to_value(v).expect("must serialize"),
1404            EventDetails::UpdateItemV1(v) => serde_json::to_value(v).expect("must serialize"),
1405            EventDetails::AlterRetainHistoryV1(v) => {
1406                serde_json::to_value(v).expect("must serialize")
1407            }
1408            EventDetails::AlterAddColumnV1(v) => serde_json::to_value(v).expect("must serialize"),
1409            EventDetails::AlterSourceTimestampIntervalV1(v) => {
1410                serde_json::to_value(v).expect("must serialize")
1411            }
1412            EventDetails::ToNewIdV1(v) => serde_json::to_value(v).expect("must serialize"),
1413            EventDetails::FromPreviousIdV1(v) => serde_json::to_value(v).expect("must serialize"),
1414            EventDetails::SetV1(v) => serde_json::to_value(v).expect("must serialize"),
1415            EventDetails::ResetAllV1 => serde_json::Value::Null,
1416            EventDetails::RotateKeysV1(v) => serde_json::to_value(v).expect("must serialize"),
1417            EventDetails::CreateRoleV1(v) => serde_json::to_value(v).expect("must serialize"),
1418        }
1419    }
1420}
1421
1422#[derive(
1423    Clone,
1424    Debug,
1425    Serialize,
1426    Deserialize,
1427    PartialOrd,
1428    PartialEq,
1429    Eq,
1430    Ord,
1431    Hash,
1432    Arbitrary
1433)]
1434pub struct EventV1 {
1435    pub id: u64,
1436    pub event_type: EventType,
1437    pub object_type: ObjectType,
1438    pub details: EventDetails,
1439    pub user: Option<String>,
1440    pub occurred_at: EpochMillis,
1441}
1442
1443impl EventV1 {
1444    fn new(
1445        id: u64,
1446        event_type: EventType,
1447        object_type: ObjectType,
1448        details: EventDetails,
1449        user: Option<String>,
1450        occurred_at: EpochMillis,
1451    ) -> EventV1 {
1452        EventV1 {
1453            id,
1454            event_type,
1455            object_type,
1456            details,
1457            user,
1458            occurred_at,
1459        }
1460    }
1461}
1462
1463#[derive(
1464    Clone,
1465    Debug,
1466    Serialize,
1467    Deserialize,
1468    PartialOrd,
1469    PartialEq,
1470    Eq,
1471    Ord,
1472    Hash,
1473    Arbitrary
1474)]
1475pub struct StorageUsageV1 {
1476    pub id: u64,
1477    pub shard_id: Option<String>,
1478    pub size_bytes: u64,
1479    pub collection_timestamp: EpochMillis,
1480}
1481
1482impl StorageUsageV1 {
1483    pub fn new(
1484        id: u64,
1485        shard_id: Option<String>,
1486        size_bytes: u64,
1487        collection_timestamp: EpochMillis,
1488    ) -> StorageUsageV1 {
1489        StorageUsageV1 {
1490            id,
1491            shard_id,
1492            size_bytes,
1493            collection_timestamp,
1494        }
1495    }
1496}
1497
1498/// Describes the environment's storage usage at a point in time.
1499///
1500/// This type is persisted in the catalog across restarts, so any updates to the
1501/// schema will require a new version.
1502#[derive(
1503    Clone,
1504    Debug,
1505    Serialize,
1506    Deserialize,
1507    PartialOrd,
1508    PartialEq,
1509    Eq,
1510    Ord,
1511    Hash,
1512    Arbitrary
1513)]
1514pub enum VersionedStorageUsage {
1515    V1(StorageUsageV1),
1516}
1517
1518impl VersionedStorageUsage {
1519    /// Create a new metric snapshot.
1520    /// This function must always require and produce the most
1521    /// recent variant of VersionedStorageMetrics.
1522    pub fn new(
1523        id: u64,
1524        object_id: Option<String>,
1525        size_bytes: u64,
1526        collection_timestamp: EpochMillis,
1527    ) -> Self {
1528        Self::V1(StorageUsageV1::new(
1529            id,
1530            object_id,
1531            size_bytes,
1532            collection_timestamp,
1533        ))
1534    }
1535
1536    // Implement deserialize and serialize so writers and readers don't have to
1537    // coordinate about which Serializer to use.
1538    pub fn deserialize(data: &[u8]) -> Result<Self, anyhow::Error> {
1539        Ok(serde_json::from_slice(data)?)
1540    }
1541
1542    pub fn serialize(&self) -> Vec<u8> {
1543        serde_json::to_vec(self).expect("must serialize")
1544    }
1545
1546    pub fn timestamp(&self) -> EpochMillis {
1547        match self {
1548            VersionedStorageUsage::V1(StorageUsageV1 {
1549                collection_timestamp,
1550                ..
1551            }) => *collection_timestamp,
1552        }
1553    }
1554
1555    /// Returns a globally sortable event order. All event versions must have this
1556    /// field.
1557    pub fn sortable_id(&self) -> u64 {
1558        match self {
1559            VersionedStorageUsage::V1(usage) => usage.id,
1560        }
1561    }
1562}
1563
1564#[cfg(test)]
1565mod tests {
1566    use crate::{EventDetails, EventType, EventV1, IdNameV1, ObjectType, VersionedEvent};
1567
1568    // Test all versions of events. This test hard codes bytes so that
1569    // programmers are not able to change data structures here without this test
1570    // failing. Instead of changing data structures, add new variants.
1571    #[mz_ore::test]
1572    fn test_audit_log() -> Result<(), anyhow::Error> {
1573        let cases: Vec<(VersionedEvent, &'static str)> = vec![(
1574            VersionedEvent::V1(EventV1::new(
1575                2,
1576                EventType::Drop,
1577                ObjectType::ClusterReplica,
1578                EventDetails::IdNameV1(IdNameV1 {
1579                    id: "u1".to_string(),
1580                    name: "name".into(),
1581                }),
1582                None,
1583                2,
1584            )),
1585            r#"{"V1":{"id":2,"event_type":"drop","object_type":"cluster-replica","details":{"IdNameV1":{"id":"u1","name":"name"}},"user":null,"occurred_at":2}}"#,
1586        )];
1587
1588        for (event, expected_bytes) in cases {
1589            let event_bytes = serde_json::to_vec(&event).unwrap();
1590            assert_eq!(
1591                event_bytes,
1592                expected_bytes.as_bytes(),
1593                "expected bytes {}, got {}",
1594                expected_bytes,
1595                std::str::from_utf8(&event_bytes).unwrap(),
1596            );
1597        }
1598
1599        Ok(())
1600    }
1601}