segment/
batcher.rs

1//! Utilities for batching up messages.
2
3use crate::message::{Batch, BatchMessage, Message};
4use crate::{Error, Result};
5use serde_json::{Map, Value};
6use time::OffsetDateTime;
7
8const MAX_MESSAGE_SIZE: usize = 1024 * 32;
9const MAX_BATCH_SIZE: usize = 1024 * 512;
10
11/// A batcher can accept messages into an internal buffer, and report when
12/// messages must be flushed.
13///
14/// The recommended usage pattern looks something like this:
15///
16/// ```
17/// use segment::{Batcher, Client, HttpClient};
18/// use segment::message::{BatchMessage, Track, User};
19/// use serde_json::json;
20///
21/// let mut batcher = Batcher::new(None);
22/// let client = HttpClient::default();
23///
24/// for i in 0..100 {
25///     let msg = Track {
26///         user: User::UserId { user_id: format!("user-{}", i) },
27///         event: "Example".to_owned(),
28///         properties: json!({ "foo": "bar" }),
29///         ..Default::default()
30///     };
31///
32///     // Batcher returns back ownership of a message if the internal buffer
33///     // would overflow.
34///     //
35///     // When this occurs, we flush the batcher, create a new batcher, and add
36///     // the message into the new batcher.
37///     if let Some(msg) = batcher.push(msg).unwrap() {
38///         client.send("your_write_key".to_string(), batcher.into_message());
39///         batcher = Batcher::new(None);
40///         batcher.push(msg).unwrap();
41///     }
42/// }
43/// ```
44///
45/// Batcher will attempt to fit messages into maximally-sized batches, thus
46/// reducing the number of round trips required with Segment's tracking API.
47/// However, if you produce messages infrequently, this may significantly delay
48/// the sending of messages to Segment.
49///
50/// If this delay is a concern, it is recommended that you periodically flush
51/// the batcher on your own by calling `into_message`.
52///
53/// By default if the message you push in the batcher does not contains any
54/// timestamp, the timestamp at the time of the push will be automatically
55/// added to your message.
56/// You can disable this behaviour with the [without_auto_timestamp] method
57/// though.
58#[derive(Clone, Debug)]
59pub struct Batcher {
60    pub(crate) buf: Vec<BatchMessage>,
61    pub(crate) byte_count: usize,
62    pub(crate) context: Option<Value>,
63    pub(crate) auto_timestamp: bool,
64}
65
66impl Batcher {
67    /// Construct a new, empty batcher.
68    ///
69    /// Optionally, you may specify a `context` that should be set on every
70    /// batch returned by `into_message`.
71    pub fn new(context: Option<Value>) -> Self {
72        Self {
73            buf: Vec::new(),
74            byte_count: 0,
75            context,
76            auto_timestamp: true,
77        }
78    }
79
80    pub fn without_auto_timestamp(&mut self) {
81        self.auto_timestamp = false;
82    }
83
84    pub fn is_empty(&self) -> bool {
85        self.buf.is_empty()
86    }
87
88    /// Push a message into the batcher.
89    ///
90    /// Returns `Ok(None)` if the message was accepted and is now owned by the
91    /// batcher.
92    ///
93    /// Returns `Ok(Some(msg))` if the message was rejected because the current
94    /// batch would be oversized if this message were accepted. The given
95    /// message is returned back, and it is recommended that you flush the
96    /// current batch before attempting to push `msg` in again.
97    ///
98    /// Returns an error if the message is too large to be sent to Segment's
99    /// API.
100    pub fn push(&mut self, msg: impl Into<BatchMessage>) -> Result<Option<BatchMessage>> {
101        let mut msg: BatchMessage = msg.into();
102        let timestamp = msg.timestamp_mut();
103        if self.auto_timestamp && timestamp.is_none() {
104            *timestamp = Some(OffsetDateTime::now_utc());
105        }
106        let size = serde_json::to_vec(&msg)?.len();
107        if size > MAX_MESSAGE_SIZE {
108            return Err(Error::MessageTooLarge);
109        }
110
111        self.byte_count += size + 1; // +1 to account for Serialized data's extra commas
112        if self.byte_count > MAX_BATCH_SIZE {
113            return Ok(Some(msg));
114        }
115
116        self.buf.push(msg);
117        Ok(None)
118    }
119
120    pub(crate) fn take(&mut self) -> Vec<BatchMessage> {
121        self.byte_count = 0;
122        std::mem::take(&mut self.buf)
123    }
124
125    /// Consumes this batcher and converts it into a message that can be sent to
126    /// Segment.
127    pub fn into_message(self) -> Message {
128        Message::Batch(Batch {
129            batch: self.buf,
130            context: self.context,
131            integrations: None,
132            extra: Map::default(),
133        })
134    }
135}
136
137#[cfg(test)]
138mod tests {
139    use super::*;
140    use crate::message::{Track, User};
141    use serde_json::json;
142
143    #[test]
144    fn test_push_and_into() {
145        let batch_msg = BatchMessage::Track(Track {
146            ..Default::default()
147        });
148
149        let context = json!({
150            "foo": "bar",
151        });
152
153        let mut batcher = Batcher::new(Some(context.clone()));
154        batcher.without_auto_timestamp();
155        let result = batcher.push(batch_msg.clone());
156        assert_eq!(None, result.ok().unwrap());
157
158        let batch = batcher.into_message();
159        let inner_batch = match batch {
160            Message::Batch(b) => b,
161            _ => panic!("invalid message type"),
162        };
163        assert_eq!(context, inner_batch.context.unwrap());
164        assert_eq!(1, inner_batch.batch.len());
165
166        assert_eq!(inner_batch.batch, vec![batch_msg]);
167    }
168
169    #[test]
170    fn test_bad_message_size() {
171        let batch_msg = Track {
172            user: User::UserId {
173                user_id: String::from_utf8(vec![b'a'; 1024 * 33]).unwrap(),
174            },
175            ..Default::default()
176        };
177
178        let mut batcher = Batcher::new(None);
179        let result = batcher.push(batch_msg);
180
181        let err = result.err().unwrap();
182        assert!(err.to_string().contains("message too large"));
183    }
184
185    #[test]
186    fn test_max_buffer() {
187        let batch_msg = Track {
188            user: User::UserId {
189                user_id: String::from_utf8(vec![b'a'; 1024 * 30]).unwrap(),
190            },
191            ..Default::default()
192        };
193
194        let mut batcher = Batcher::new(None);
195        batcher.without_auto_timestamp();
196        let mut result = Ok(None);
197        for _i in 0..20 {
198            result = batcher.push(batch_msg.clone());
199            dbg!(&result);
200            if result.is_ok() && result.as_ref().ok().unwrap().is_some() {
201                break;
202            }
203        }
204
205        let msg = result.ok().unwrap();
206        assert_eq!(BatchMessage::from(batch_msg), msg.unwrap());
207    }
208}