1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
//! Utilities for batching up messages.
//! When a batch is full it is automatically sent over the network
use serde_json::Map;
use crate::{
batcher::Batcher,
client::Client,
errors::Result,
http::HttpClient,
message::{Batch, BatchMessage, Message},
};
/// A batcher can accept messages into an internal buffer, and report when
/// messages must be flushed.
///
/// The recommended usage pattern looks something like this:
///
/// ```
/// use segment::{AutoBatcher, Batcher, HttpClient};
/// use segment::message::{BatchMessage, Track, User};
/// use serde_json::json;
///
/// let client = HttpClient::default();
/// let batcher= Batcher::new(None);
/// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
///
/// for i in 0..100 {
/// let msg = Track {
/// user: User::UserId { user_id: format!("user-{}", i) },
/// event: "Example".to_owned(),
/// properties: json!({ "foo": "bar" }),
/// ..Default::default()
/// };
///
/// batcher.push(msg); // .await
/// }
/// ```
///
/// Batcher will attempt to fit messages into maximally-sized batches, thus
/// reducing the number of round trips required with Segment's tracking API.
/// However, if you produce messages infrequently, this may significantly delay
/// the sending of messages to Segment.
///
/// If this delay is a concern, it is recommended that you periodically flush
/// the batcher on your own by calling [Self::flush].
#[derive(Clone)]
pub struct AutoBatcher {
client: HttpClient,
batcher: Batcher,
key: String,
}
impl AutoBatcher {
/// Construct a new, empty batcher.
///
/// ```
/// use segment::{AutoBatcher, Batcher, HttpClient};
///
/// let client = HttpClient::default();
/// let batcher = Batcher::new(None);
/// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
/// ```
pub fn new(client: HttpClient, batcher: Batcher, key: String) -> Self {
Self {
batcher,
client,
key,
}
}
/// Push a message into the batcher.
/// If the batcher is full, send it and create a new batcher with the message.
///
/// Returns an error if the message is too large to be sent to Segment's
/// API.
///
/// ```
/// use serde_json::json;
/// use segment::{AutoBatcher, Batcher, HttpClient};
/// use segment::message::{BatchMessage, Track, User};
///
/// let client = HttpClient::default();
/// let batcher = Batcher::new(None);
/// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
///
/// let msg = BatchMessage::Track(Track {
/// user: User::UserId { user_id: String::from("user") },
/// event: "Example".to_owned(),
/// properties: json!({ "foo": "bar" }),
/// ..Default::default()
/// });
///
/// batcher.push(msg); // .await
/// ```
pub async fn push(&mut self, msg: impl Into<BatchMessage>) -> Result<()> {
if let Some(msg) = self.batcher.push(msg)? {
self.flush().await?;
// this can't return None: the batcher is empty and if the message is
// larger than the max size of the batcher it's supposed to throw an error
self.batcher.push(msg)?;
}
Ok(())
}
/// Send all the message currently contained in the batcher, full or empty.
///
/// Returns an error if the message is too large to be sent to Segment's
/// API.
/// ```
/// use serde_json::json;
/// use segment::{AutoBatcher, Batcher, HttpClient};
/// use segment::message::{BatchMessage, Track, User};
///
/// let client = HttpClient::default();
/// let batcher = Batcher::new(None);
/// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
///
/// let msg = BatchMessage::Track(Track {
/// user: User::UserId { user_id: String::from("user") },
/// event: "Example".to_owned(),
/// properties: json!({ "foo": "bar" }),
/// ..Default::default()
/// });
///
/// batcher.push(msg); // .await
/// batcher.flush(); // .await
/// ```
pub async fn flush(&mut self) -> Result<()> {
let message = Message::Batch(Batch {
batch: std::mem::take(&mut self.batcher.buf),
context: self.batcher.context.clone(),
integrations: None,
extra: Map::default(),
});
self.client.send(self.key.to_string(), message).await?;
Ok(())
}
}