segment/
auto_batcher.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
//! Utilities for batching up messages.
//! When a batch is full it is automatically sent over the network

use serde_json::Map;

use crate::{
    batcher::Batcher,
    client::Client,
    errors::Result,
    http::HttpClient,
    message::{Batch, BatchMessage, Message},
};

/// A batcher can accept messages into an internal buffer, and report when
/// messages must be flushed.
///
/// The recommended usage pattern looks something like this:
///
/// ```
/// use segment::{AutoBatcher, Batcher, HttpClient};
/// use segment::message::{BatchMessage, Track, User};
/// use serde_json::json;
///
/// let client = HttpClient::default();
/// let batcher= Batcher::new(None);
/// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
///
/// for i in 0..100 {
///     let msg = Track {
///         user: User::UserId { user_id: format!("user-{}", i) },
///         event: "Example".to_owned(),
///         properties: json!({ "foo": "bar" }),
///         ..Default::default()
///     };
///
///     batcher.push(msg); // .await
/// }
/// ```
///
/// Batcher will attempt to fit messages into maximally-sized batches, thus
/// reducing the number of round trips required with Segment's tracking API.
/// However, if you produce messages infrequently, this may significantly delay
/// the sending of messages to Segment.
///
/// If this delay is a concern, it is recommended that you periodically flush
/// the batcher on your own by calling [Self::flush].
#[derive(Clone)]
pub struct AutoBatcher {
    client: HttpClient,
    batcher: Batcher,
    key: String,
}

impl AutoBatcher {
    /// Construct a new, empty batcher.
    ///
    /// ```
    /// use segment::{AutoBatcher, Batcher, HttpClient};
    ///
    /// let client = HttpClient::default();
    /// let batcher = Batcher::new(None);
    /// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
    /// ```
    pub fn new(client: HttpClient, batcher: Batcher, key: String) -> Self {
        Self {
            batcher,
            client,
            key,
        }
    }

    /// Push a message into the batcher.
    /// If the batcher is full, send it and create a new batcher with the message.
    ///
    /// Returns an error if the message is too large to be sent to Segment's
    /// API.
    ///
    /// ```
    /// use serde_json::json;
    /// use segment::{AutoBatcher, Batcher, HttpClient};
    /// use segment::message::{BatchMessage, Track, User};
    ///
    /// let client = HttpClient::default();
    /// let batcher = Batcher::new(None);
    /// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
    ///
    /// let msg = BatchMessage::Track(Track {
    ///     user: User::UserId { user_id: String::from("user") },
    ///     event: "Example".to_owned(),
    ///     properties: json!({ "foo": "bar" }),
    ///     ..Default::default()
    /// });
    ///
    /// batcher.push(msg); // .await
    /// ```
    pub async fn push(&mut self, msg: impl Into<BatchMessage>) -> Result<()> {
        if let Some(msg) = self.batcher.push(msg)? {
            self.flush().await?;
            // this can't return None: the batcher is empty and if the message is
            // larger than the max size of the batcher it's supposed to throw an error
            self.batcher.push(msg)?;
        }

        Ok(())
    }

    /// Send all the message currently contained in the batcher, full or empty.
    ///
    /// Returns an error if the message is too large to be sent to Segment's
    /// API.
    /// ```
    /// use serde_json::json;
    /// use segment::{AutoBatcher, Batcher, HttpClient};
    /// use segment::message::{BatchMessage, Track, User};
    ///
    /// let client = HttpClient::default();
    /// let batcher = Batcher::new(None);
    /// let mut batcher = AutoBatcher::new(client, batcher, "your_write_key".to_string());
    ///
    /// let msg = BatchMessage::Track(Track {
    ///     user: User::UserId { user_id: String::from("user") },
    ///     event: "Example".to_owned(),
    ///     properties: json!({ "foo": "bar" }),
    ///     ..Default::default()
    /// });
    ///
    /// batcher.push(msg); // .await
    /// batcher.flush(); // .await
    /// ```
    pub async fn flush(&mut self) -> Result<()> {
        let message = Message::Batch(Batch {
            batch: std::mem::take(&mut self.batcher.buf),
            context: self.batcher.context.clone(),
            integrations: None,
            extra: Map::default(),
        });

        self.client.send(self.key.to_string(), message).await?;
        Ok(())
    }
}