segment/
auto_batcher.rs

1//! Utilities for batching up messages.
2//! When a batch is full it is automatically sent over the network
3
4use serde_json::Map;
5
6use crate::{
7    batcher::Batcher,
8    client::Client,
9    errors::Result,
10    http::HttpClient,
11    message::{Batch, BatchMessage, Message},
12};
13
14/// A batcher can accept messages into an internal buffer, and report when
15/// messages must be flushed.
16///
17/// The recommended usage pattern looks something like this:
18///
19/// ```
20/// use segment::{AutoBatcher, Batcher, HttpClient};
21/// use segment::message::{BatchMessage, Track, User};
22/// use serde_json::json;
23///
24/// let client = HttpClient::default();
25/// let batcher= Batcher::new(None);
26/// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
27///
28/// for i in 0..100 {
29///     let msg = Track {
30///         user: User::UserId { user_id: format!("user-{}", i) },
31///         event: "Example".to_owned(),
32///         properties: json!({ "foo": "bar" }),
33///         ..Default::default()
34///     };
35///
36///     batcher.push(msg); // .await
37/// }
38/// ```
39///
40/// Batcher will attempt to fit messages into maximally-sized batches, thus
41/// reducing the number of round trips required with Segment's tracking API.
42/// However, if you produce messages infrequently, this may significantly delay
43/// the sending of messages to Segment.
44///
45/// If this delay is a concern, it is recommended that you periodically flush
46/// the batcher on your own by calling [Self::flush].
47#[derive(Clone, Debug)]
48pub struct AutoBatcher {
49    client: HttpClient,
50    batcher: Batcher,
51    key: String,
52}
53
54impl AutoBatcher {
55    /// Construct a new, empty batcher.
56    ///
57    /// ```
58    /// use segment::{AutoBatcher, Batcher, HttpClient};
59    ///
60    /// let client = HttpClient::default();
61    /// let batcher = Batcher::new(None);
62    /// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
63    /// ```
64    pub fn new(client: HttpClient, batcher: Batcher, key: String) -> Self {
65        Self {
66            batcher,
67            client,
68            key,
69        }
70    }
71
72    /// Push a message into the batcher.
73    /// If the batcher is full, send it and create a new batcher with the message.
74    ///
75    /// Returns an error if the message is too large to be sent to Segment's
76    /// API.
77    ///
78    /// ```
79    /// use serde_json::json;
80    /// use segment::{AutoBatcher, Batcher, HttpClient};
81    /// use segment::message::{BatchMessage, Track, User};
82    ///
83    /// let client = HttpClient::default();
84    /// let batcher = Batcher::new(None);
85    /// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
86    ///
87    /// let msg = BatchMessage::Track(Track {
88    ///     user: User::UserId { user_id: String::from("user") },
89    ///     event: "Example".to_owned(),
90    ///     properties: json!({ "foo": "bar" }),
91    ///     ..Default::default()
92    /// });
93    ///
94    /// batcher.push(msg); // .await
95    /// ```
96    pub async fn push(&mut self, msg: impl Into<BatchMessage>) -> Result<()> {
97        if let Some(msg) = self.batcher.push(msg)? {
98            self.flush().await?;
99            // this can't return None: the batcher is empty and if the message is
100            // larger than the max size of the batcher it's supposed to throw an error
101            self.batcher.push(msg)?;
102        }
103
104        Ok(())
105    }
106
107    /// Send all the message currently contained in the batcher, full or empty.
108    ///
109    /// Returns an error if the message is too large to be sent to Segment's
110    /// API.
111    /// ```
112    /// use serde_json::json;
113    /// use segment::{AutoBatcher, Batcher, HttpClient};
114    /// use segment::message::{BatchMessage, Track, User};
115    ///
116    /// let client = HttpClient::default();
117    /// let batcher = Batcher::new(None);
118    /// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
119    ///
120    /// let msg = BatchMessage::Track(Track {
121    ///     user: User::UserId { user_id: String::from("user") },
122    ///     event: "Example".to_owned(),
123    ///     properties: json!({ "foo": "bar" }),
124    ///     ..Default::default()
125    /// });
126    ///
127    /// batcher.push(msg); // .await
128    /// batcher.flush(); // .await
129    /// ```
130    pub async fn flush(&mut self) -> Result<()> {
131        if self.batcher.is_empty() {
132            return Ok(());
133        }
134
135        let message = Message::Batch(Batch {
136            batch: self.batcher.take(),
137            context: self.batcher.context.clone(),
138            integrations: None,
139            extra: Map::default(),
140        });
141
142        self.client.send(self.key.to_string(), message).await?;
143        Ok(())
144    }
145}