1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Telemetry utilities.

use chrono::{DateTime, Utc};
use mz_audit_log::ObjectType;
use mz_sql::catalog::EnvironmentId;
use mz_sql_parser::ast::StatementKind;
use serde::{Deserialize, Serialize};
use serde_json::json;
use uuid::Uuid;

/// Details to attach to a Segment event.
#[derive(Debug, Clone, Default)]
pub struct EventDetails<'a> {
    /// The ID of the user that triggered the event, if any.
    pub user_id: Option<Uuid>,
    /// The value of the `application_name` parameter for the session that
    /// triggered the event, if any.
    pub application_name: Option<&'a str>,
    /// The timestamp at which the event occurred, if not the current time.
    pub timestamp: Option<DateTime<Utc>>,
}

/// Extension trait for [`mz_segment::Client`].
pub trait SegmentClientExt {
    /// Tracks an event associated with an environment.
    fn environment_track<S>(
        &self,
        environment_id: &EnvironmentId,
        event: S,
        properties: serde_json::Value,
        details: EventDetails<'_>,
    ) where
        S: Into<String>;
}

impl SegmentClientExt for mz_segment::Client {
    /// Tracks an event associated with an environment.
    ///
    /// Various metadata about the environment is automatically attached
    /// using canonical field names.
    ///
    /// # Panics
    ///
    /// Panics if `properties` is not a [`serde_json::Value::Object`].
    fn environment_track<S>(
        &self,
        environment_id: &EnvironmentId,
        event: S,
        mut properties: serde_json::Value,
        EventDetails {
            user_id,
            timestamp,
            application_name,
        }: EventDetails<'_>,
    ) where
        S: Into<String>,
    {
        {
            let properties = match &mut properties {
                serde_json::Value::Object(map) => map,
                _ => {
                    panic!("SegmentClientExt::environment_track called with non-object properties")
                }
            };
            properties.insert("event_source".into(), json!("environment"));
            properties.insert(
                "cloud_provider".into(),
                json!(environment_id.cloud_provider().to_string()),
            );
            properties.insert(
                "cloud_provider_region".into(),
                json!(environment_id.cloud_provider_region()),
            );
            if let Some(application_name) = application_name {
                properties.insert("application_name".into(), json!(application_name));
            }
        }

        // "Context" is a defined dictionary of extra information related to a datapoint. Please
        // consult the docs before adding anything here: https://segment.com/docs/connections/spec/common/#context
        let context = json!({
            "group_id": environment_id.organization_id()
        });

        // We use the organization ID as the user ID for events that are not
        // associated with a particular user.
        let user_id = user_id.unwrap_or_else(|| environment_id.organization_id());

        self.track(user_id, event, properties, Some(context), timestamp);
    }
}

/// Describes a way in which DDL statement execution can fail.
#[derive(Clone, Debug, Serialize, Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum StatementFailureType {
    /// The statement failed to parse.
    ParseFailure,
}

impl StatementFailureType {
    /// Renders the name of failure type as a title case string.
    pub fn as_title_case(&self) -> &'static str {
        match self {
            StatementFailureType::ParseFailure => "Parse Failed",
        }
    }
}

serde_plain::derive_display_from_serialize!(StatementFailureType);

/// Describes the action of a DDL statement.
#[derive(Serialize, Deserialize)]
pub enum StatementAction {
    /// The statement alters an object.
    Alter,
    /// The statement creates an object.
    Create,
}

impl StatementAction {
    /// Renders the DDL action as a title case string.
    pub fn as_title_case(&self) -> &'static str {
        match self {
            StatementAction::Alter => "Alter",
            StatementAction::Create => "Create",
        }
    }
}

/// Analyzes the action and object type of DDL statement kinds.
///
/// Returns `None` if the given statement kind is not a tracked DDL `StatementKind`. Tracked
/// statements are those which are written to the audit log.
pub fn analyze_audited_statement(
    statement: StatementKind,
) -> Option<(StatementAction, ObjectType)> {
    match statement {
        StatementKind::AlterConnection => Some((StatementAction::Alter, ObjectType::Connection)),
        StatementKind::AlterIndex => Some((StatementAction::Alter, ObjectType::Index)),
        StatementKind::AlterRole => Some((StatementAction::Alter, ObjectType::Role)),
        StatementKind::AlterSecret => Some((StatementAction::Alter, ObjectType::Secret)),
        StatementKind::AlterSource => Some((StatementAction::Alter, ObjectType::Source)),
        StatementKind::CreateCluster => Some((StatementAction::Create, ObjectType::Cluster)),
        StatementKind::CreateClusterReplica => {
            Some((StatementAction::Create, ObjectType::ClusterReplica))
        }
        StatementKind::CreateConnection => Some((StatementAction::Create, ObjectType::Connection)),
        StatementKind::CreateDatabase => Some((StatementAction::Create, ObjectType::Database)),
        StatementKind::CreateIndex => Some((StatementAction::Create, ObjectType::Index)),
        StatementKind::CreateMaterializedView => {
            Some((StatementAction::Create, ObjectType::MaterializedView))
        }
        StatementKind::CreateRole => Some((StatementAction::Create, ObjectType::Role)),
        StatementKind::CreateSchema => Some((StatementAction::Create, ObjectType::Schema)),
        StatementKind::CreateSecret => Some((StatementAction::Create, ObjectType::Secret)),
        StatementKind::CreateSink => Some((StatementAction::Create, ObjectType::Sink)),
        StatementKind::CreateSource => Some((StatementAction::Create, ObjectType::Source)),
        StatementKind::CreateTable => Some((StatementAction::Create, ObjectType::Table)),
        StatementKind::CreateView => Some((StatementAction::Create, ObjectType::View)),
        _ => None,
    }
}