1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Segment library for Rust.
11//!
12//! This crate provides a library to the [Segment] analytics platform.
13//! It is a small wrapper around the [`segment`] crate to provide a more
14//! ergonomic interface.
15//!
16//! [Segment]: https://segment.com
17//! [`segment`]: https://docs.rs/segment
1819use std::fmt;
2021use chrono::{DateTime, Utc};
22use segment::message::{Batch, BatchMessage, Group, Message, Track, User};
23use segment::{Batcher, Client as _, HttpClient};
24use time::OffsetDateTime;
25use tokio::sync::mpsc::error::TrySendError;
26use tokio::sync::mpsc::{self, Receiver, Sender};
27use tracing::{error, warn};
28use uuid::Uuid;
2930/// The maximum number of undelivered events. Once this limit is reached,
31/// new events will be dropped.
32const MAX_PENDING_EVENTS: usize = 32_768;
3334pub struct Config {
35/// The API key to use to authenticate events with Segment.
36pub api_key: String,
37/// Whether this Segment client is being used on the client side (rather
38 /// than the server side).
39 ///
40 /// Enabling this causes the Segment server to record the IP address from
41 /// which the event was sent.
42pub client_side: bool,
43}
4445/// A [Segment] API client.
46///
47/// Event delivery is best effort. There is no guarantee that a given event
48/// will be delivered to Segment.
49///
50/// [Segment]: https://segment.com
51#[derive(Clone)]
52pub struct Client {
53 client_side: bool,
54 tx: Sender<BatchMessage>,
55}
5657impl fmt::Debug for Client {
58fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.write_str("<segment client>")
60 }
61}
6263impl Client {
64/// Creates a new client.
65pub fn new(
66 Config {
67 api_key,
68 client_side,
69 }: Config,
70 ) -> Client {
71let (tx, rx) = mpsc::channel(MAX_PENDING_EVENTS);
7273let send_task = SendTask {
74 api_key,
75 http_client: HttpClient::default(),
76 };
77 mz_ore::task::spawn(
78 || "segment_send_task",
79async move { send_task.run(rx).await },
80 );
8182 Client { client_side, tx }
83 }
8485/// Creates a new dummy client that doesn't report anything but still
86 /// accepts updates.
87pub fn new_dummy_client() -> Client {
88let (tx, mut rx) = mpsc::channel(MAX_PENDING_EVENTS);
8990 mz_ore::task::spawn(|| "segment_send_task", async move {
91while let Some(msg) = rx.recv().await {
92tracing::debug!(?msg, "segment update");
93 }
94 });
9596 Client {
97 client_side: true,
98 tx,
99 }
100 }
101102/// Sends a new [track event] to Segment.
103 ///
104 /// Delivery happens asynchronously on a background thread. It is best
105 /// effort. There is no guarantee that the event will be delivered to
106 /// Segment. Events may be dropped when the client is backlogged. Errors are
107 /// logged but not returned.
108 ///
109 /// [track event]: https://segment.com/docs/connections/spec/track/
110pub fn track<S>(
111&self,
112 user_id: Uuid,
113 event: S,
114 properties: serde_json::Value,
115 context: Option<serde_json::Value>,
116 timestamp: Option<DateTime<Utc>>,
117 ) where
118S: Into<String>,
119 {
120let timestamp = match timestamp {
121None => None,
122Some(t) => match OffsetDateTime::from_unix_timestamp(t.timestamp())
123 .and_then(|odt| odt.replace_nanosecond(t.timestamp_subsec_nanos()))
124 {
125Ok(timestamp) => Some(timestamp),
126Err(e) => {
127error!(%e, "failed to convert timestamp for Segment event");
128return;
129 }
130 },
131 };
132self.send(BatchMessage::Track(Track {
133 user: User::UserId {
134 user_id: user_id.to_string(),
135 },
136 event: event.into(),
137 properties,
138 context,
139 timestamp,
140 ..Default::default()
141 }));
142 }
143144/// Sends a new [group event] to Segment.
145 ///
146 /// Delivery happens asynchronously on a background thread. It is best
147 /// effort. There is no guarantee that the event will be delivered to
148 /// Segment. Events may be dropped when the client is backlogged. Errors are
149 /// logged but not returned.
150 ///
151 /// [track event]: https://segment.com/docs/connections/spec/group/
152pub fn group(&self, user_id: Uuid, group_id: Uuid, traits: serde_json::Value) {
153self.send(BatchMessage::Group(Group {
154 user: User::UserId {
155 user_id: user_id.to_string(),
156 },
157 group_id: group_id.to_string(),
158 traits,
159 ..Default::default()
160 }));
161 }
162163fn send(&self, mut message: BatchMessage) {
164if self.client_side {
165// If running on the client side, pretend to be the Analytics.js
166 // library by force-setting the appropriate library name in the
167 // context. This is how Segment determines whether to attach an IP
168 // to the incoming requests.
169let context = match &mut message {
170 BatchMessage::Alias(a) => &mut a.context,
171 BatchMessage::Group(i) => &mut i.context,
172 BatchMessage::Identify(i) => &mut i.context,
173 BatchMessage::Page(i) => &mut i.context,
174 BatchMessage::Screen(i) => &mut i.context,
175 BatchMessage::Track(t) => &mut t.context,
176 };
177let context = context.get_or_insert_with(|| serde_json::json!({}));
178 context
179 .as_object_mut()
180 .expect("Segment context must be object")
181 .insert(
182"library".into(),
183serde_json::json!({
184"name": "analytics.js",
185 }),
186 );
187 }
188189match self.tx.try_send(message) {
190Ok(()) => (),
191Err(TrySendError::Closed(_)) => panic!("receiver must not drop first"),
192Err(TrySendError::Full(_)) => {
193warn!("dropping segment event because queue is full");
194 }
195 }
196 }
197}
198199struct SendTask {
200 api_key: String,
201 http_client: HttpClient,
202}
203204impl SendTask {
205async fn run(&self, mut rx: Receiver<BatchMessage>) {
206// On each turn of the loop, we accumulate all outstanding messages and
207 // send them to Segment in the largest batches possible. We never have
208 // more than one outstanding request to Segment.
209loop {
210let mut batcher = Batcher::new(None);
211212// Wait for the first event to arrive.
213match rx.recv().await {
214Some(message) => batcher = self.enqueue(batcher, message).await,
215None => return,
216 };
217218// Accumulate any other messages that are ready. `enqueue` may
219 // flush the batch to Segment if we hit the maximum batch size.
220while let Ok(message) = rx.try_recv() {
221 batcher = self.enqueue(batcher, message).await;
222 }
223224// Drain the queue.
225self.flush(batcher).await;
226 }
227 }
228229async fn enqueue(&self, mut batcher: Batcher, message: BatchMessage) -> Batcher {
230match batcher.push(message) {
231Ok(None) => (),
232Ok(Some(message)) => {
233self.flush(batcher).await;
234 batcher = Batcher::new(None);
235 batcher
236 .push(message)
237 .expect("message cannot fail to enqueue twice");
238 }
239Err(e) => {
240warn!("error enqueueing segment message: {}", e);
241 }
242 }
243 batcher
244 }
245246async fn flush(&self, batcher: Batcher) {
247let message = batcher.into_message();
248if matches!(&message, Message::Batch(Batch { batch , .. }) if batch.is_empty()) {
249return;
250 }
251if let Err(e) = self.http_client.send(self.api_key.clone(), message).await {
252warn!("error sending message to segment: {}", e);
253 }
254 }
255}