1use std::fmt;
20
21use chrono::{DateTime, Utc};
22use segment::message::{Batch, BatchMessage, Group, Message, Track, User};
23use segment::{Batcher, Client as _, HttpClient};
24use time::OffsetDateTime;
25use tokio::sync::mpsc::error::TrySendError;
26use tokio::sync::mpsc::{self, Receiver, Sender};
27use tracing::{error, warn};
28use uuid::Uuid;
29
30const MAX_PENDING_EVENTS: usize = 32_768;
33
34pub struct Config {
35 pub api_key: String,
37 pub client_side: bool,
43}
44
45#[derive(Clone)]
52pub struct Client {
53 client_side: bool,
54 tx: Sender<BatchMessage>,
55}
56
57impl fmt::Debug for Client {
58 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
59 f.write_str("<segment client>")
60 }
61}
62
63impl Client {
64 pub fn new(
66 Config {
67 api_key,
68 client_side,
69 }: Config,
70 ) -> Client {
71 let (tx, rx) = mpsc::channel(MAX_PENDING_EVENTS);
72
73 let send_task = SendTask {
74 api_key,
75 http_client: HttpClient::default(),
76 };
77 mz_ore::task::spawn(
78 || "segment_send_task",
79 async move { send_task.run(rx).await },
80 );
81
82 Client { client_side, tx }
83 }
84
85 pub fn new_dummy_client() -> Client {
88 let (tx, mut rx) = mpsc::channel(MAX_PENDING_EVENTS);
89
90 mz_ore::task::spawn(|| "segment_send_task", async move {
91 while let Some(msg) = rx.recv().await {
92 tracing::debug!(?msg, "segment update");
93 }
94 });
95
96 Client {
97 client_side: true,
98 tx,
99 }
100 }
101
102 pub fn track<S>(
111 &self,
112 user_id: Uuid,
113 event: S,
114 properties: serde_json::Value,
115 context: Option<serde_json::Value>,
116 timestamp: Option<DateTime<Utc>>,
117 ) where
118 S: Into<String>,
119 {
120 let timestamp = match timestamp {
121 None => None,
122 Some(t) => match OffsetDateTime::from_unix_timestamp(t.timestamp())
123 .and_then(|odt| odt.replace_nanosecond(t.timestamp_subsec_nanos()))
124 {
125 Ok(timestamp) => Some(timestamp),
126 Err(e) => {
127 error!(%e, "failed to convert timestamp for Segment event");
128 return;
129 }
130 },
131 };
132 self.send(BatchMessage::Track(Track {
133 user: User::UserId {
134 user_id: user_id.to_string(),
135 },
136 event: event.into(),
137 properties,
138 context,
139 timestamp,
140 ..Default::default()
141 }));
142 }
143
144 pub fn group(&self, user_id: Uuid, group_id: Uuid, traits: serde_json::Value) {
153 self.send(BatchMessage::Group(Group {
154 user: User::UserId {
155 user_id: user_id.to_string(),
156 },
157 group_id: group_id.to_string(),
158 traits,
159 ..Default::default()
160 }));
161 }
162
163 fn send(&self, mut message: BatchMessage) {
164 if self.client_side {
165 let context = match &mut message {
170 BatchMessage::Alias(a) => &mut a.context,
171 BatchMessage::Group(i) => &mut i.context,
172 BatchMessage::Identify(i) => &mut i.context,
173 BatchMessage::Page(i) => &mut i.context,
174 BatchMessage::Screen(i) => &mut i.context,
175 BatchMessage::Track(t) => &mut t.context,
176 };
177 let context = context.get_or_insert_with(|| serde_json::json!({}));
178 context
179 .as_object_mut()
180 .expect("Segment context must be object")
181 .insert(
182 "library".into(),
183 serde_json::json!({
184 "name": "analytics.js",
185 }),
186 );
187 }
188
189 match self.tx.try_send(message) {
190 Ok(()) => (),
191 Err(TrySendError::Closed(_)) => panic!("receiver must not drop first"),
192 Err(TrySendError::Full(_)) => {
193 warn!("dropping segment event because queue is full");
194 }
195 }
196 }
197}
198
199struct SendTask {
200 api_key: String,
201 http_client: HttpClient,
202}
203
204impl SendTask {
205 async fn run(&self, mut rx: Receiver<BatchMessage>) {
206 loop {
210 let mut batcher = Batcher::new(None);
211
212 match rx.recv().await {
214 Some(message) => batcher = self.enqueue(batcher, message).await,
215 None => return,
216 };
217
218 while let Ok(message) = rx.try_recv() {
221 batcher = self.enqueue(batcher, message).await;
222 }
223
224 self.flush(batcher).await;
226 }
227 }
228
229 async fn enqueue(&self, mut batcher: Batcher, message: BatchMessage) -> Batcher {
230 match batcher.push(message) {
231 Ok(None) => (),
232 Ok(Some(message)) => {
233 self.flush(batcher).await;
234 batcher = Batcher::new(None);
235 batcher
236 .push(message)
237 .expect("message cannot fail to enqueue twice");
238 }
239 Err(e) => {
240 warn!("error enqueueing segment message: {}", e);
241 }
242 }
243 batcher
244 }
245
246 async fn flush(&self, batcher: Batcher) {
247 let message = batcher.into_message();
248 if matches!(&message, Message::Batch(Batch { batch , .. }) if batch.is_empty()) {
249 return;
250 }
251 if let Err(e) = self.http_client.send(self.api_key.clone(), message).await {
252 warn!("error sending message to segment: {}", e);
253 }
254 }
255}