1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Segment library for Rust.
11//!
12//! This crate provides a library to the [Segment] analytics platform.
13//! It is a small wrapper around the [`segment`] crate to provide a more
14//! ergonomic interface.
15//!
16//! [Segment]: https://segment.com
17//! [`segment`]: https://docs.rs/segment
1819use std::fmt;
2021use chrono::{DateTime, Utc};
22use segment::message::{Batch, BatchMessage, Group, Message, Track, User};
23use segment::{Batcher, Client as _, HttpClient};
24use time::OffsetDateTime;
25use tokio::sync::mpsc::error::TrySendError;
26use tokio::sync::mpsc::{self, Receiver, Sender};
27use tracing::{error, warn};
28use uuid::Uuid;
2930/// The maximum number of undelivered events. Once this limit is reached,
31/// new events will be dropped.
32const MAX_PENDING_EVENTS: usize = 32_768;
3334pub struct Config {
35/// The API key to use to authenticate events with Segment.
36pub api_key: String,
37/// Whether this Segment client is being used on the client side (rather
38 /// than the server side).
39 ///
40 /// Enabling this causes the Segment server to record the IP address from
41 /// which the event was sent.
42pub client_side: bool,
43}
4445/// A [Segment] API client.
46///
47/// Event delivery is best effort. There is no guarantee that a given event
48/// will be delivered to Segment.
49///
50/// [Segment]: https://segment.com
51#[derive(Clone)]
52pub struct Client {
53 client_side: bool,
54 tx: Sender<BatchMessage>,
55}
5657impl fmt::Debug for Client {
58fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.write_str("<segment client>")
60 }
61}
6263impl Client {
64/// Creates a new client.
65pub fn new(
66 Config {
67 api_key,
68 client_side,
69 }: Config,
70 ) -> Client {
71let (tx, rx) = mpsc::channel(MAX_PENDING_EVENTS);
7273let send_task = SendTask {
74 api_key,
75 http_client: HttpClient::default(),
76 };
77 mz_ore::task::spawn(
78 || "segment_send_task",
79async move { send_task.run(rx).await },
80 );
8182 Client { client_side, tx }
83 }
8485/// Sends a new [track event] to Segment.
86 ///
87 /// Delivery happens asynchronously on a background thread. It is best
88 /// effort. There is no guarantee that the event will be delivered to
89 /// Segment. Events may be dropped when the client is backlogged. Errors are
90 /// logged but not returned.
91 ///
92 /// [track event]: https://segment.com/docs/connections/spec/track/
93pub fn track<S>(
94&self,
95 user_id: Uuid,
96 event: S,
97 properties: serde_json::Value,
98 context: Option<serde_json::Value>,
99 timestamp: Option<DateTime<Utc>>,
100 ) where
101S: Into<String>,
102 {
103let timestamp = match timestamp {
104None => None,
105Some(t) => match OffsetDateTime::from_unix_timestamp(t.timestamp())
106 .and_then(|odt| odt.replace_nanosecond(t.timestamp_subsec_nanos()))
107 {
108Ok(timestamp) => Some(timestamp),
109Err(e) => {
110error!(%e, "failed to convert timestamp for Segment event");
111return;
112 }
113 },
114 };
115self.send(BatchMessage::Track(Track {
116 user: User::UserId {
117 user_id: user_id.to_string(),
118 },
119 event: event.into(),
120 properties,
121 context,
122 timestamp,
123 ..Default::default()
124 }));
125 }
126127/// Sends a new [group event] to Segment.
128 ///
129 /// Delivery happens asynchronously on a background thread. It is best
130 /// effort. There is no guarantee that the event will be delivered to
131 /// Segment. Events may be dropped when the client is backlogged. Errors are
132 /// logged but not returned.
133 ///
134 /// [track event]: https://segment.com/docs/connections/spec/group/
135pub fn group(&self, user_id: Uuid, group_id: Uuid, traits: serde_json::Value) {
136self.send(BatchMessage::Group(Group {
137 user: User::UserId {
138 user_id: user_id.to_string(),
139 },
140 group_id: group_id.to_string(),
141 traits,
142 ..Default::default()
143 }));
144 }
145146fn send(&self, mut message: BatchMessage) {
147if self.client_side {
148// If running on the client side, pretend to be the Analytics.js
149 // library by force-setting the appropriate library name in the
150 // context. This is how Segment determines whether to attach an IP
151 // to the incoming requests.
152let context = match &mut message {
153 BatchMessage::Alias(a) => &mut a.context,
154 BatchMessage::Group(i) => &mut i.context,
155 BatchMessage::Identify(i) => &mut i.context,
156 BatchMessage::Page(i) => &mut i.context,
157 BatchMessage::Screen(i) => &mut i.context,
158 BatchMessage::Track(t) => &mut t.context,
159 };
160let context = context.get_or_insert_with(|| serde_json::json!({}));
161 context
162 .as_object_mut()
163 .expect("Segment context must be object")
164 .insert(
165"library".into(),
166serde_json::json!({
167"name": "analytics.js",
168 }),
169 );
170 }
171172match self.tx.try_send(message) {
173Ok(()) => (),
174Err(TrySendError::Closed(_)) => panic!("receiver must not drop first"),
175Err(TrySendError::Full(_)) => {
176warn!("dropping segment event because queue is full");
177 }
178 }
179 }
180}
181182struct SendTask {
183 api_key: String,
184 http_client: HttpClient,
185}
186187impl SendTask {
188async fn run(&self, mut rx: Receiver<BatchMessage>) {
189// On each turn of the loop, we accumulate all outstanding messages and
190 // send them to Segment in the largest batches possible. We never have
191 // more than one outstanding request to Segment.
192loop {
193let mut batcher = Batcher::new(None);
194195// Wait for the first event to arrive.
196match rx.recv().await {
197Some(message) => batcher = self.enqueue(batcher, message).await,
198None => return,
199 };
200201// Accumulate any other messages that are ready. `enqueue` may
202 // flush the batch to Segment if we hit the maximum batch size.
203while let Ok(message) = rx.try_recv() {
204 batcher = self.enqueue(batcher, message).await;
205 }
206207// Drain the queue.
208self.flush(batcher).await;
209 }
210 }
211212async fn enqueue(&self, mut batcher: Batcher, message: BatchMessage) -> Batcher {
213match batcher.push(message) {
214Ok(None) => (),
215Ok(Some(message)) => {
216self.flush(batcher).await;
217 batcher = Batcher::new(None);
218 batcher
219 .push(message)
220 .expect("message cannot fail to enqueue twice");
221 }
222Err(e) => {
223warn!("error enqueueing segment message: {}", e);
224 }
225 }
226 batcher
227 }
228229async fn flush(&self, batcher: Batcher) {
230let message = batcher.into_message();
231if matches!(&message, Message::Batch(Batch { batch , .. }) if batch.is_empty()) {
232return;
233 }
234if let Err(e) = self.http_client.send(self.api_key.clone(), message).await {
235warn!("error sending message to segment: {}", e);
236 }
237 }
238}