mz_segment/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Segment library for Rust.
11//!
12//! This crate provides a library to the [Segment] analytics platform.
13//! It is a small wrapper around the [`segment`] crate to provide a more
14//! ergonomic interface.
15//!
16//! [Segment]: https://segment.com
17//! [`segment`]: https://docs.rs/segment
18
19use std::fmt;
20
21use chrono::{DateTime, Utc};
22use segment::message::{Batch, BatchMessage, Group, Message, Track, User};
23use segment::{Batcher, Client as _, HttpClient};
24use time::OffsetDateTime;
25use tokio::sync::mpsc::error::TrySendError;
26use tokio::sync::mpsc::{self, Receiver, Sender};
27use tracing::{error, warn};
28use uuid::Uuid;
29
30/// The maximum number of undelivered events. Once this limit is reached,
31/// new events will be dropped.
32const MAX_PENDING_EVENTS: usize = 32_768;
33
34pub struct Config {
35    /// The API key to use to authenticate events with Segment.
36    pub api_key: String,
37    /// Whether this Segment client is being used on the client side (rather
38    /// than the server side).
39    ///
40    /// Enabling this causes the Segment server to record the IP address from
41    /// which the event was sent.
42    pub client_side: bool,
43}
44
45/// A [Segment] API client.
46///
47/// Event delivery is best effort. There is no guarantee that a given event
48/// will be delivered to Segment.
49///
50/// [Segment]: https://segment.com
51#[derive(Clone)]
52pub struct Client {
53    client_side: bool,
54    tx: Sender<BatchMessage>,
55}
56
57impl fmt::Debug for Client {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.write_str("<segment client>")
60    }
61}
62
63impl Client {
64    /// Creates a new client.
65    pub fn new(
66        Config {
67            api_key,
68            client_side,
69        }: Config,
70    ) -> Client {
71        let (tx, rx) = mpsc::channel(MAX_PENDING_EVENTS);
72
73        let send_task = SendTask {
74            api_key,
75            http_client: HttpClient::default(),
76        };
77        mz_ore::task::spawn(
78            || "segment_send_task",
79            async move { send_task.run(rx).await },
80        );
81
82        Client { client_side, tx }
83    }
84
85    /// Sends a new [track event] to Segment.
86    ///
87    /// Delivery happens asynchronously on a background thread. It is best
88    /// effort. There is no guarantee that the event will be delivered to
89    /// Segment. Events may be dropped when the client is backlogged. Errors are
90    /// logged but not returned.
91    ///
92    /// [track event]: https://segment.com/docs/connections/spec/track/
93    pub fn track<S>(
94        &self,
95        user_id: Uuid,
96        event: S,
97        properties: serde_json::Value,
98        context: Option<serde_json::Value>,
99        timestamp: Option<DateTime<Utc>>,
100    ) where
101        S: Into<String>,
102    {
103        let timestamp = match timestamp {
104            None => None,
105            Some(t) => match OffsetDateTime::from_unix_timestamp(t.timestamp())
106                .and_then(|odt| odt.replace_nanosecond(t.timestamp_subsec_nanos()))
107            {
108                Ok(timestamp) => Some(timestamp),
109                Err(e) => {
110                    error!(%e, "failed to convert timestamp for Segment event");
111                    return;
112                }
113            },
114        };
115        self.send(BatchMessage::Track(Track {
116            user: User::UserId {
117                user_id: user_id.to_string(),
118            },
119            event: event.into(),
120            properties,
121            context,
122            timestamp,
123            ..Default::default()
124        }));
125    }
126
127    /// Sends a new [group event] to Segment.
128    ///
129    /// Delivery happens asynchronously on a background thread. It is best
130    /// effort. There is no guarantee that the event will be delivered to
131    /// Segment. Events may be dropped when the client is backlogged. Errors are
132    /// logged but not returned.
133    ///
134    /// [track event]: https://segment.com/docs/connections/spec/group/
135    pub fn group(&self, user_id: Uuid, group_id: Uuid, traits: serde_json::Value) {
136        self.send(BatchMessage::Group(Group {
137            user: User::UserId {
138                user_id: user_id.to_string(),
139            },
140            group_id: group_id.to_string(),
141            traits,
142            ..Default::default()
143        }));
144    }
145
146    fn send(&self, mut message: BatchMessage) {
147        if self.client_side {
148            // If running on the client side, pretend to be the Analytics.js
149            // library by force-setting the appropriate library name in the
150            // context. This is how Segment determines whether to attach an IP
151            // to the incoming requests.
152            let context = match &mut message {
153                BatchMessage::Alias(a) => &mut a.context,
154                BatchMessage::Group(i) => &mut i.context,
155                BatchMessage::Identify(i) => &mut i.context,
156                BatchMessage::Page(i) => &mut i.context,
157                BatchMessage::Screen(i) => &mut i.context,
158                BatchMessage::Track(t) => &mut t.context,
159            };
160            let context = context.get_or_insert_with(|| serde_json::json!({}));
161            context
162                .as_object_mut()
163                .expect("Segment context must be object")
164                .insert(
165                    "library".into(),
166                    serde_json::json!({
167                            "name": "analytics.js",
168                    }),
169                );
170        }
171
172        match self.tx.try_send(message) {
173            Ok(()) => (),
174            Err(TrySendError::Closed(_)) => panic!("receiver must not drop first"),
175            Err(TrySendError::Full(_)) => {
176                warn!("dropping segment event because queue is full");
177            }
178        }
179    }
180}
181
182struct SendTask {
183    api_key: String,
184    http_client: HttpClient,
185}
186
187impl SendTask {
188    async fn run(&self, mut rx: Receiver<BatchMessage>) {
189        // On each turn of the loop, we accumulate all outstanding messages and
190        // send them to Segment in the largest batches possible. We never have
191        // more than one outstanding request to Segment.
192        loop {
193            let mut batcher = Batcher::new(None);
194
195            // Wait for the first event to arrive.
196            match rx.recv().await {
197                Some(message) => batcher = self.enqueue(batcher, message).await,
198                None => return,
199            };
200
201            // Accumulate any other messages that are ready. `enqueue` may
202            // flush the batch to Segment if we hit the maximum batch size.
203            while let Ok(message) = rx.try_recv() {
204                batcher = self.enqueue(batcher, message).await;
205            }
206
207            // Drain the queue.
208            self.flush(batcher).await;
209        }
210    }
211
212    async fn enqueue(&self, mut batcher: Batcher, message: BatchMessage) -> Batcher {
213        match batcher.push(message) {
214            Ok(None) => (),
215            Ok(Some(message)) => {
216                self.flush(batcher).await;
217                batcher = Batcher::new(None);
218                batcher
219                    .push(message)
220                    .expect("message cannot fail to enqueue twice");
221            }
222            Err(e) => {
223                warn!("error enqueueing segment message: {}", e);
224            }
225        }
226        batcher
227    }
228
229    async fn flush(&self, batcher: Batcher) {
230        let message = batcher.into_message();
231        if matches!(&message, Message::Batch(Batch { batch , .. }) if batch.is_empty()) {
232            return;
233        }
234        if let Err(e) = self.http_client.send(self.api_key.clone(), message).await {
235            warn!("error sending message to segment: {}", e);
236        }
237    }
238}