segment/auto_batcher.rs
1//! Utilities for batching up messages.
2//! When a batch is full it is automatically sent over the network
3
4use serde_json::Map;
5
6use crate::{
7 batcher::Batcher,
8 client::Client,
9 errors::Result,
10 http::HttpClient,
11 message::{Batch, BatchMessage, Message},
12};
13
14/// A batcher can accept messages into an internal buffer, and report when
15/// messages must be flushed.
16///
17/// The recommended usage pattern looks something like this:
18///
19/// ```
20/// use segment::{AutoBatcher, Batcher, HttpClient};
21/// use segment::message::{BatchMessage, Track, User};
22/// use serde_json::json;
23///
24/// let client = HttpClient::default();
25/// let batcher= Batcher::new(None);
26/// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
27///
28/// for i in 0..100 {
29/// let msg = Track {
30/// user: User::UserId { user_id: format!("user-{}", i) },
31/// event: "Example".to_owned(),
32/// properties: json!({ "foo": "bar" }),
33/// ..Default::default()
34/// };
35///
36/// batcher.push(msg); // .await
37/// }
38/// ```
39///
40/// Batcher will attempt to fit messages into maximally-sized batches, thus
41/// reducing the number of round trips required with Segment's tracking API.
42/// However, if you produce messages infrequently, this may significantly delay
43/// the sending of messages to Segment.
44///
45/// If this delay is a concern, it is recommended that you periodically flush
46/// the batcher on your own by calling [Self::flush].
47#[derive(Clone, Debug)]
48pub struct AutoBatcher {
49 client: HttpClient,
50 batcher: Batcher,
51 key: String,
52}
53
54impl AutoBatcher {
55 /// Construct a new, empty batcher.
56 ///
57 /// ```
58 /// use segment::{AutoBatcher, Batcher, HttpClient};
59 ///
60 /// let client = HttpClient::default();
61 /// let batcher = Batcher::new(None);
62 /// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
63 /// ```
64 pub fn new(client: HttpClient, batcher: Batcher, key: String) -> Self {
65 Self {
66 batcher,
67 client,
68 key,
69 }
70 }
71
72 /// Push a message into the batcher.
73 /// If the batcher is full, send it and create a new batcher with the message.
74 ///
75 /// Returns an error if the message is too large to be sent to Segment's
76 /// API.
77 ///
78 /// ```
79 /// use serde_json::json;
80 /// use segment::{AutoBatcher, Batcher, HttpClient};
81 /// use segment::message::{BatchMessage, Track, User};
82 ///
83 /// let client = HttpClient::default();
84 /// let batcher = Batcher::new(None);
85 /// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
86 ///
87 /// let msg = BatchMessage::Track(Track {
88 /// user: User::UserId { user_id: String::from("user") },
89 /// event: "Example".to_owned(),
90 /// properties: json!({ "foo": "bar" }),
91 /// ..Default::default()
92 /// });
93 ///
94 /// batcher.push(msg); // .await
95 /// ```
96 pub async fn push(&mut self, msg: impl Into<BatchMessage>) -> Result<()> {
97 if let Some(msg) = self.batcher.push(msg)? {
98 self.flush().await?;
99 // this can't return None: the batcher is empty and if the message is
100 // larger than the max size of the batcher it's supposed to throw an error
101 self.batcher.push(msg)?;
102 }
103
104 Ok(())
105 }
106
107 /// Send all the message currently contained in the batcher, full or empty.
108 ///
109 /// Returns an error if the message is too large to be sent to Segment's
110 /// API.
111 /// ```
112 /// use serde_json::json;
113 /// use segment::{AutoBatcher, Batcher, HttpClient};
114 /// use segment::message::{BatchMessage, Track, User};
115 ///
116 /// let client = HttpClient::default();
117 /// let batcher = Batcher::new(None);
118 /// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
119 ///
120 /// let msg = BatchMessage::Track(Track {
121 /// user: User::UserId { user_id: String::from("user") },
122 /// event: "Example".to_owned(),
123 /// properties: json!({ "foo": "bar" }),
124 /// ..Default::default()
125 /// });
126 ///
127 /// batcher.push(msg); // .await
128 /// batcher.flush(); // .await
129 /// ```
130 pub async fn flush(&mut self) -> Result<()> {
131 if self.batcher.is_empty() {
132 return Ok(());
133 }
134
135 let message = Message::Batch(Batch {
136 batch: self.batcher.take(),
137 context: self.batcher.context.clone(),
138 integrations: None,
139 extra: Map::default(),
140 });
141
142 self.client.send(self.key.to_string(), message).await?;
143 Ok(())
144 }
145}