segment/batcher.rs
1//! Utilities for batching up messages.
2
3use crate::message::{Batch, BatchMessage, Message};
4use crate::{Error, Result};
5use serde_json::{Map, Value};
6use time::OffsetDateTime;
7
8const MAX_MESSAGE_SIZE: usize = 1024 * 32;
9const MAX_BATCH_SIZE: usize = 1024 * 512;
10
11/// A batcher can accept messages into an internal buffer, and report when
12/// messages must be flushed.
13///
14/// The recommended usage pattern looks something like this:
15///
16/// ```
17/// use segment::{Batcher, Client, HttpClient};
18/// use segment::message::{BatchMessage, Track, User};
19/// use serde_json::json;
20///
21/// let mut batcher = Batcher::new(None);
22/// let client = HttpClient::default();
23///
24/// for i in 0..100 {
25/// let msg = Track {
26/// user: User::UserId { user_id: format!("user-{}", i) },
27/// event: "Example".to_owned(),
28/// properties: json!({ "foo": "bar" }),
29/// ..Default::default()
30/// };
31///
32/// // Batcher returns back ownership of a message if the internal buffer
33/// // would overflow.
34/// //
35/// // When this occurs, we flush the batcher, create a new batcher, and add
36/// // the message into the new batcher.
37/// if let Some(msg) = batcher.push(msg).unwrap() {
38/// client.send("your_write_key".to_string(), batcher.into_message());
39/// batcher = Batcher::new(None);
40/// batcher.push(msg).unwrap();
41/// }
42/// }
43/// ```
44///
45/// Batcher will attempt to fit messages into maximally-sized batches, thus
46/// reducing the number of round trips required with Segment's tracking API.
47/// However, if you produce messages infrequently, this may significantly delay
48/// the sending of messages to Segment.
49///
50/// If this delay is a concern, it is recommended that you periodically flush
51/// the batcher on your own by calling `into_message`.
52///
53/// By default if the message you push in the batcher does not contains any
54/// timestamp, the timestamp at the time of the push will be automatically
55/// added to your message.
56/// You can disable this behaviour with the [without_auto_timestamp] method
57/// though.
58#[derive(Clone, Debug)]
59pub struct Batcher {
60 pub(crate) buf: Vec<BatchMessage>,
61 pub(crate) byte_count: usize,
62 pub(crate) context: Option<Value>,
63 pub(crate) auto_timestamp: bool,
64}
65
66impl Batcher {
67 /// Construct a new, empty batcher.
68 ///
69 /// Optionally, you may specify a `context` that should be set on every
70 /// batch returned by `into_message`.
71 pub fn new(context: Option<Value>) -> Self {
72 Self {
73 buf: Vec::new(),
74 byte_count: 0,
75 context,
76 auto_timestamp: true,
77 }
78 }
79
80 pub fn without_auto_timestamp(&mut self) {
81 self.auto_timestamp = false;
82 }
83
84 pub fn is_empty(&self) -> bool {
85 self.buf.is_empty()
86 }
87
88 /// Push a message into the batcher.
89 ///
90 /// Returns `Ok(None)` if the message was accepted and is now owned by the
91 /// batcher.
92 ///
93 /// Returns `Ok(Some(msg))` if the message was rejected because the current
94 /// batch would be oversized if this message were accepted. The given
95 /// message is returned back, and it is recommended that you flush the
96 /// current batch before attempting to push `msg` in again.
97 ///
98 /// Returns an error if the message is too large to be sent to Segment's
99 /// API.
100 pub fn push(&mut self, msg: impl Into<BatchMessage>) -> Result<Option<BatchMessage>> {
101 let mut msg: BatchMessage = msg.into();
102 let timestamp = msg.timestamp_mut();
103 if self.auto_timestamp && timestamp.is_none() {
104 *timestamp = Some(OffsetDateTime::now_utc());
105 }
106 let size = serde_json::to_vec(&msg)?.len();
107 if size > MAX_MESSAGE_SIZE {
108 return Err(Error::MessageTooLarge);
109 }
110
111 self.byte_count += size + 1; // +1 to account for Serialized data's extra commas
112 if self.byte_count > MAX_BATCH_SIZE {
113 return Ok(Some(msg));
114 }
115
116 self.buf.push(msg);
117 Ok(None)
118 }
119
120 pub(crate) fn take(&mut self) -> Vec<BatchMessage> {
121 self.byte_count = 0;
122 std::mem::take(&mut self.buf)
123 }
124
125 /// Consumes this batcher and converts it into a message that can be sent to
126 /// Segment.
127 pub fn into_message(self) -> Message {
128 Message::Batch(Batch {
129 batch: self.buf,
130 context: self.context,
131 integrations: None,
132 extra: Map::default(),
133 })
134 }
135}
136
137#[cfg(test)]
138mod tests {
139 use super::*;
140 use crate::message::{Track, User};
141 use serde_json::json;
142
143 #[test]
144 fn test_push_and_into() {
145 let batch_msg = BatchMessage::Track(Track {
146 ..Default::default()
147 });
148
149 let context = json!({
150 "foo": "bar",
151 });
152
153 let mut batcher = Batcher::new(Some(context.clone()));
154 batcher.without_auto_timestamp();
155 let result = batcher.push(batch_msg.clone());
156 assert_eq!(None, result.ok().unwrap());
157
158 let batch = batcher.into_message();
159 let inner_batch = match batch {
160 Message::Batch(b) => b,
161 _ => panic!("invalid message type"),
162 };
163 assert_eq!(context, inner_batch.context.unwrap());
164 assert_eq!(1, inner_batch.batch.len());
165
166 assert_eq!(inner_batch.batch, vec![batch_msg]);
167 }
168
169 #[test]
170 fn test_bad_message_size() {
171 let batch_msg = Track {
172 user: User::UserId {
173 user_id: String::from_utf8(vec![b'a'; 1024 * 33]).unwrap(),
174 },
175 ..Default::default()
176 };
177
178 let mut batcher = Batcher::new(None);
179 let result = batcher.push(batch_msg);
180
181 let err = result.err().unwrap();
182 assert!(err.to_string().contains("message too large"));
183 }
184
185 #[test]
186 fn test_max_buffer() {
187 let batch_msg = Track {
188 user: User::UserId {
189 user_id: String::from_utf8(vec![b'a'; 1024 * 30]).unwrap(),
190 },
191 ..Default::default()
192 };
193
194 let mut batcher = Batcher::new(None);
195 batcher.without_auto_timestamp();
196 let mut result = Ok(None);
197 for _i in 0..20 {
198 result = batcher.push(batch_msg.clone());
199 dbg!(&result);
200 if result.is_ok() && result.as_ref().ok().unwrap().is_some() {
201 break;
202 }
203 }
204
205 let msg = result.ok().unwrap();
206 assert_eq!(BatchMessage::from(batch_msg), msg.unwrap());
207 }
208}