mz_segment/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Segment library for Rust.
11//!
12//! This crate provides a library to the [Segment] analytics platform.
13//! It is a small wrapper around the [`segment`] crate to provide a more
14//! ergonomic interface.
15//!
16//! [Segment]: https://segment.com
17//! [`segment`]: https://docs.rs/segment
18
19use std::fmt;
20
21use chrono::{DateTime, Utc};
22use segment::message::{Batch, BatchMessage, Group, Message, Track, User};
23use segment::{Batcher, Client as _, HttpClient};
24use time::OffsetDateTime;
25use tokio::sync::mpsc::error::TrySendError;
26use tokio::sync::mpsc::{self, Receiver, Sender};
27use tracing::{error, warn};
28use uuid::Uuid;
29
30/// The maximum number of undelivered events. Once this limit is reached,
31/// new events will be dropped.
32const MAX_PENDING_EVENTS: usize = 32_768;
33
34pub struct Config {
35    /// The API key to use to authenticate events with Segment.
36    pub api_key: String,
37    /// Whether this Segment client is being used on the client side (rather
38    /// than the server side).
39    ///
40    /// Enabling this causes the Segment server to record the IP address from
41    /// which the event was sent.
42    pub client_side: bool,
43}
44
45/// A [Segment] API client.
46///
47/// Event delivery is best effort. There is no guarantee that a given event
48/// will be delivered to Segment.
49///
50/// [Segment]: https://segment.com
51#[derive(Clone)]
52pub struct Client {
53    client_side: bool,
54    tx: Sender<BatchMessage>,
55}
56
57impl fmt::Debug for Client {
58    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59        f.write_str("<segment client>")
60    }
61}
62
63impl Client {
64    /// Creates a new client.
65    pub fn new(
66        Config {
67            api_key,
68            client_side,
69        }: Config,
70    ) -> Client {
71        let (tx, rx) = mpsc::channel(MAX_PENDING_EVENTS);
72
73        let send_task = SendTask {
74            api_key,
75            http_client: HttpClient::default(),
76        };
77        mz_ore::task::spawn(
78            || "segment_send_task",
79            async move { send_task.run(rx).await },
80        );
81
82        Client { client_side, tx }
83    }
84
85    /// Creates a new dummy client that doesn't report anything but still
86    /// accepts updates.
87    pub fn new_dummy_client() -> Client {
88        let (tx, mut rx) = mpsc::channel(MAX_PENDING_EVENTS);
89
90        mz_ore::task::spawn(|| "segment_send_task", async move {
91            while let Some(msg) = rx.recv().await {
92                tracing::debug!(?msg, "segment update");
93            }
94        });
95
96        Client {
97            client_side: true,
98            tx,
99        }
100    }
101
102    /// Sends a new [track event] to Segment.
103    ///
104    /// Delivery happens asynchronously on a background thread. It is best
105    /// effort. There is no guarantee that the event will be delivered to
106    /// Segment. Events may be dropped when the client is backlogged. Errors are
107    /// logged but not returned.
108    ///
109    /// [track event]: https://segment.com/docs/connections/spec/track/
110    pub fn track<S>(
111        &self,
112        user_id: Uuid,
113        event: S,
114        properties: serde_json::Value,
115        context: Option<serde_json::Value>,
116        timestamp: Option<DateTime<Utc>>,
117    ) where
118        S: Into<String>,
119    {
120        let timestamp = match timestamp {
121            None => None,
122            Some(t) => match OffsetDateTime::from_unix_timestamp(t.timestamp())
123                .and_then(|odt| odt.replace_nanosecond(t.timestamp_subsec_nanos()))
124            {
125                Ok(timestamp) => Some(timestamp),
126                Err(e) => {
127                    error!(%e, "failed to convert timestamp for Segment event");
128                    return;
129                }
130            },
131        };
132        self.send(BatchMessage::Track(Track {
133            user: User::UserId {
134                user_id: user_id.to_string(),
135            },
136            event: event.into(),
137            properties,
138            context,
139            timestamp,
140            ..Default::default()
141        }));
142    }
143
144    /// Sends a new [group event] to Segment.
145    ///
146    /// Delivery happens asynchronously on a background thread. It is best
147    /// effort. There is no guarantee that the event will be delivered to
148    /// Segment. Events may be dropped when the client is backlogged. Errors are
149    /// logged but not returned.
150    ///
151    /// [track event]: https://segment.com/docs/connections/spec/group/
152    pub fn group(&self, user_id: Uuid, group_id: Uuid, traits: serde_json::Value) {
153        self.send(BatchMessage::Group(Group {
154            user: User::UserId {
155                user_id: user_id.to_string(),
156            },
157            group_id: group_id.to_string(),
158            traits,
159            ..Default::default()
160        }));
161    }
162
163    fn send(&self, mut message: BatchMessage) {
164        if self.client_side {
165            // If running on the client side, pretend to be the Analytics.js
166            // library by force-setting the appropriate library name in the
167            // context. This is how Segment determines whether to attach an IP
168            // to the incoming requests.
169            let context = match &mut message {
170                BatchMessage::Alias(a) => &mut a.context,
171                BatchMessage::Group(i) => &mut i.context,
172                BatchMessage::Identify(i) => &mut i.context,
173                BatchMessage::Page(i) => &mut i.context,
174                BatchMessage::Screen(i) => &mut i.context,
175                BatchMessage::Track(t) => &mut t.context,
176            };
177            let context = context.get_or_insert_with(|| serde_json::json!({}));
178            context
179                .as_object_mut()
180                .expect("Segment context must be object")
181                .insert(
182                    "library".into(),
183                    serde_json::json!({
184                            "name": "analytics.js",
185                    }),
186                );
187        }
188
189        match self.tx.try_send(message) {
190            Ok(()) => (),
191            Err(TrySendError::Closed(_)) => panic!("receiver must not drop first"),
192            Err(TrySendError::Full(_)) => {
193                warn!("dropping segment event because queue is full");
194            }
195        }
196    }
197}
198
199struct SendTask {
200    api_key: String,
201    http_client: HttpClient,
202}
203
204impl SendTask {
205    async fn run(&self, mut rx: Receiver<BatchMessage>) {
206        // On each turn of the loop, we accumulate all outstanding messages and
207        // send them to Segment in the largest batches possible. We never have
208        // more than one outstanding request to Segment.
209        loop {
210            let mut batcher = Batcher::new(None);
211
212            // Wait for the first event to arrive.
213            match rx.recv().await {
214                Some(message) => batcher = self.enqueue(batcher, message).await,
215                None => return,
216            };
217
218            // Accumulate any other messages that are ready. `enqueue` may
219            // flush the batch to Segment if we hit the maximum batch size.
220            while let Ok(message) = rx.try_recv() {
221                batcher = self.enqueue(batcher, message).await;
222            }
223
224            // Drain the queue.
225            self.flush(batcher).await;
226        }
227    }
228
229    async fn enqueue(&self, mut batcher: Batcher, message: BatchMessage) -> Batcher {
230        match batcher.push(message) {
231            Ok(None) => (),
232            Ok(Some(message)) => {
233                self.flush(batcher).await;
234                batcher = Batcher::new(None);
235                batcher
236                    .push(message)
237                    .expect("message cannot fail to enqueue twice");
238            }
239            Err(e) => {
240                warn!("error enqueueing segment message: {}", e);
241            }
242        }
243        batcher
244    }
245
246    async fn flush(&self, batcher: Batcher) {
247        let message = batcher.into_message();
248        if matches!(&message, Message::Batch(Batch { batch , .. }) if batch.is_empty()) {
249            return;
250        }
251        if let Err(e) = self.http_client.send(self.api_key.clone(), message).await {
252            warn!("error sending message to segment: {}", e);
253        }
254    }
255}