launchdarkly_server_sdk/events/
sender.rs
1use crate::{
2 reqwest::is_http_error_recoverable, LAUNCHDARKLY_EVENT_SCHEMA_HEADER,
3 LAUNCHDARKLY_PAYLOAD_ID_HEADER,
4};
5use chrono::DateTime;
6use crossbeam_channel::Sender;
7use std::collections::HashMap;
8
9#[cfg(feature = "event-compression")]
10use flate2::write::GzEncoder;
11#[cfg(feature = "event-compression")]
12use flate2::Compression;
13#[cfg(feature = "event-compression")]
14use std::io::Write;
15
16use futures::future::BoxFuture;
17use hyper::{client::connect::Connection, service::Service, Uri};
18use tokio::{
19 io::{AsyncRead, AsyncWrite},
20 time::{sleep, Duration},
21};
22use uuid::Uuid;
23
24use super::event::OutputEvent;
25
26pub struct EventSenderResult {
27 pub time_from_server: u128,
28 pub success: bool,
29 pub must_shutdown: bool,
30}
31
32pub trait EventSender: Send + Sync {
33 fn send_event_data(
34 &self,
35 events: Vec<OutputEvent>,
36 result_tx: Sender<EventSenderResult>,
37 ) -> BoxFuture<()>;
38}
39
40#[derive(Clone)]
41pub struct HyperEventSender<C> {
42 url: hyper::Uri,
43 sdk_key: String,
44 http: hyper::Client<C>,
45 default_headers: HashMap<&'static str, String>,
46
47 #[allow(dead_code)]
49 compress_events: bool,
50}
51
52impl<C> HyperEventSender<C>
53where
54 C: Service<Uri> + Clone + Send + Sync + 'static,
55 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
56 C::Future: Send + Unpin + 'static,
57 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
58{
59 pub fn new(
60 connector: C,
61 url: hyper::Uri,
62 sdk_key: &str,
63 default_headers: HashMap<&'static str, String>,
64 compress_events: bool,
65 ) -> Self {
66 Self {
67 url,
68 sdk_key: sdk_key.to_owned(),
69 http: hyper::Client::builder().build(connector),
70 default_headers,
71 compress_events,
72 }
73 }
74
75 fn get_server_time_from_response<Body>(&self, response: &hyper::Response<Body>) -> u128 {
76 let date_value = response
77 .headers()
78 .get("date")
79 .unwrap_or(&crate::EMPTY_HEADER)
80 .to_str()
81 .unwrap_or("")
82 .to_owned();
83
84 match DateTime::parse_from_rfc2822(&date_value) {
85 Ok(date) => date.timestamp_millis() as u128,
86 Err(_) => 0,
87 }
88 }
89}
90
91impl<C> EventSender for HyperEventSender<C>
92where
93 C: Service<Uri> + Clone + Send + Sync + 'static,
94 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
95 C::Future: Send + Unpin + 'static,
96 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
97{
98 fn send_event_data(
99 &self,
100 events: Vec<OutputEvent>,
101 result_tx: Sender<EventSenderResult>,
102 ) -> BoxFuture<()> {
103 Box::pin(async move {
104 let uuid = Uuid::new_v4();
105
106 debug!(
107 "Sending ({}): {}",
108 uuid,
109 serde_json::to_string_pretty(&events).unwrap_or_else(|e| e.to_string())
110 );
111
112 #[allow(unused_mut)]
114 let mut payload = match serde_json::to_vec(&events) {
115 Ok(json) => json,
116 Err(e) => {
117 error!(
118 "Failed to serialize event payload. Some events were dropped: {:?}",
119 e
120 );
121 return;
122 }
123 };
124
125 #[allow(unused_mut)]
127 let mut additional_headers = self.default_headers.clone();
128
129 #[cfg(feature = "event-compression")]
130 if self.compress_events {
131 let mut e = GzEncoder::new(Vec::new(), Compression::default());
132 if e.write_all(payload.as_slice()).is_ok() {
133 if let Ok(compressed) = e.finish() {
134 payload = compressed;
135 additional_headers.insert("Content-Encoding", "gzip".into());
136 }
137 }
138 }
139
140 for attempt in 1..=2 {
141 if attempt == 2 {
142 sleep(Duration::from_secs(1)).await;
143 }
144
145 let mut request_builder = hyper::Request::builder()
146 .uri(self.url.clone())
147 .method("POST")
148 .header("Content-Type", "application/json")
149 .header("Authorization", self.sdk_key.clone())
150 .header("User-Agent", &*crate::USER_AGENT)
151 .header(
152 LAUNCHDARKLY_EVENT_SCHEMA_HEADER,
153 crate::CURRENT_EVENT_SCHEMA,
154 )
155 .header(LAUNCHDARKLY_PAYLOAD_ID_HEADER, uuid.to_string());
156
157 for default_header in &additional_headers {
158 request_builder =
159 request_builder.header(*default_header.0, default_header.1.as_str());
160 }
161 let request = request_builder.body(hyper::Body::from(payload.clone()));
162
163 let result = self.http.request(request.unwrap()).await;
164
165 let response = match result {
166 Ok(response) => response,
167 Err(_) if attempt == 1 => continue,
168 Err(e) => {
169 error!("Failed to send events. Some events were dropped: {:?}", e);
172 result_tx
173 .send(EventSenderResult {
174 success: false,
175 time_from_server: 0,
176 must_shutdown: false,
177 })
178 .unwrap();
179 return;
180 }
181 };
182
183 if response.status().is_success() {
184 let _ = result_tx.send(EventSenderResult {
185 success: true,
186 time_from_server: self.get_server_time_from_response(&response),
187 must_shutdown: false,
188 });
189 return;
190 }
191
192 if !is_http_error_recoverable(response.status().as_u16()) {
193 result_tx
194 .send(EventSenderResult {
195 success: false,
196 time_from_server: 0,
197 must_shutdown: true,
198 })
199 .unwrap();
200 return;
201 }
202 }
203
204 result_tx
205 .send(EventSenderResult {
206 success: false,
207 time_from_server: 0,
208 must_shutdown: false,
209 })
210 .unwrap();
211 })
212 }
213}
214
215#[cfg(test)]
216pub(crate) struct InMemoryEventSender {
217 event_tx: Sender<OutputEvent>,
218}
219
220#[cfg(test)]
221impl InMemoryEventSender {
222 pub(crate) fn new(event_tx: Sender<OutputEvent>) -> Self {
223 Self { event_tx }
224 }
225}
226
227#[cfg(test)]
228impl EventSender for InMemoryEventSender {
229 fn send_event_data(
230 &self,
231 events: Vec<OutputEvent>,
232 sender: Sender<EventSenderResult>,
233 ) -> BoxFuture<()> {
234 Box::pin(async move {
235 for event in events {
236 self.event_tx.send(event).unwrap();
237 }
238
239 sender
240 .send(EventSenderResult {
241 time_from_server: 0,
242 success: true,
243 must_shutdown: true,
244 })
245 .unwrap();
246 })
247 }
248}
249
250#[cfg(test)]
251mod tests {
252 use super::*;
253 use crossbeam_channel::bounded;
254 use std::str::FromStr;
255 use test_case::test_case;
256
257 #[test_case(hyper::StatusCode::CONTINUE, true)]
258 #[test_case(hyper::StatusCode::OK, true)]
259 #[test_case(hyper::StatusCode::MULTIPLE_CHOICES, true)]
260 #[test_case(hyper::StatusCode::BAD_REQUEST, true)]
261 #[test_case(hyper::StatusCode::UNAUTHORIZED, false)]
262 #[test_case(hyper::StatusCode::REQUEST_TIMEOUT, true)]
263 #[test_case(hyper::StatusCode::CONFLICT, false)]
264 #[test_case(hyper::StatusCode::TOO_MANY_REQUESTS, true)]
265 #[test_case(hyper::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, false)]
266 #[test_case(hyper::StatusCode::INTERNAL_SERVER_ERROR, true)]
267 fn can_determine_recoverable_errors(status: hyper::StatusCode, is_recoverable: bool) {
268 assert_eq!(is_recoverable, is_http_error_recoverable(status.as_u16()));
269 }
270
271 #[tokio::test]
272 async fn can_parse_server_time_from_response() {
273 let mut server = mockito::Server::new_async().await;
274 server
275 .mock("POST", "/bulk")
276 .with_status(200)
277 .with_header("date", "Fri, 13 Feb 2009 23:31:30 GMT")
278 .create_async()
279 .await;
280
281 let (tx, rx) = bounded::<EventSenderResult>(5);
282 let event_sender = build_event_sender(server.url());
283
284 event_sender.send_event_data(vec![], tx).await;
285
286 let sender_result = rx.recv().unwrap();
287 assert!(sender_result.success);
288 assert!(!sender_result.must_shutdown);
289 assert_eq!(sender_result.time_from_server, 1234567890000);
290 }
291
292 #[tokio::test]
293 async fn unrecoverable_failure_requires_shutdown() {
294 let mut server = mockito::Server::new_async().await;
295 server
296 .mock("POST", "/bulk")
297 .with_status(401)
298 .create_async()
299 .await;
300
301 let (tx, rx) = bounded::<EventSenderResult>(5);
302 let event_sender = build_event_sender(server.url());
303
304 event_sender.send_event_data(vec![], tx).await;
305
306 let sender_result = rx.recv().expect("Failed to receive sender_result");
307 assert!(!sender_result.success);
308 assert!(sender_result.must_shutdown);
309 }
310
311 #[tokio::test]
312 async fn recoverable_failures_are_attempted_multiple_times() {
313 let mut server = mockito::Server::new_async().await;
314 let mock = server
315 .mock("POST", "/bulk")
316 .with_status(400)
317 .expect(2)
318 .create_async()
319 .await;
320
321 let (tx, rx) = bounded::<EventSenderResult>(5);
322 let event_sender = build_event_sender(server.url());
323
324 event_sender.send_event_data(vec![], tx).await;
325
326 let sender_result = rx.recv().expect("Failed to receive sender_result");
327 assert!(!sender_result.success);
328 assert!(!sender_result.must_shutdown);
329 mock.assert();
330 }
331
332 #[tokio::test]
333 async fn retrying_requests_can_eventually_succeed() {
334 let mut server = mockito::Server::new_async().await;
335 server
336 .mock("POST", "/bulk")
337 .with_status(400)
338 .create_async()
339 .await;
340 server
341 .mock("POST", "/bulk")
342 .with_status(200)
343 .with_header("date", "Fri, 13 Feb 2009 23:31:30 GMT")
344 .create_async()
345 .await;
346
347 let (tx, rx) = bounded::<EventSenderResult>(5);
348 let event_sender = build_event_sender(server.url());
349
350 event_sender.send_event_data(vec![], tx).await;
351
352 let sender_result = rx.recv().expect("Failed to receive sender_result");
353 assert!(sender_result.success);
354 assert!(!sender_result.must_shutdown);
355 assert_eq!(sender_result.time_from_server, 1234567890000);
356 }
357
358 fn build_event_sender(url: String) -> HyperEventSender<hyper::client::HttpConnector> {
359 let url = format!("{}/bulk", &url);
360 let url = hyper::Uri::from_str(&url).expect("Failed parsing the mock server url");
361
362 HyperEventSender::new(
363 hyper::client::HttpConnector::new(),
364 url,
365 "sdk-key",
366 HashMap::<&str, String>::new(),
367 false,
368 )
369 }
370}