launchdarkly_server_sdk/events/
sender.rs

1use crate::{
2    reqwest::is_http_error_recoverable, LAUNCHDARKLY_EVENT_SCHEMA_HEADER,
3    LAUNCHDARKLY_PAYLOAD_ID_HEADER,
4};
5use chrono::DateTime;
6use crossbeam_channel::Sender;
7use std::collections::HashMap;
8
9#[cfg(feature = "event-compression")]
10use flate2::write::GzEncoder;
11#[cfg(feature = "event-compression")]
12use flate2::Compression;
13#[cfg(feature = "event-compression")]
14use std::io::Write;
15
16use futures::future::BoxFuture;
17use hyper::{client::connect::Connection, service::Service, Uri};
18use tokio::{
19    io::{AsyncRead, AsyncWrite},
20    time::{sleep, Duration},
21};
22use uuid::Uuid;
23
24use super::event::OutputEvent;
25
26pub struct EventSenderResult {
27    pub time_from_server: u128,
28    pub success: bool,
29    pub must_shutdown: bool,
30}
31
32pub trait EventSender: Send + Sync {
33    fn send_event_data(
34        &self,
35        events: Vec<OutputEvent>,
36        result_tx: Sender<EventSenderResult>,
37    ) -> BoxFuture<()>;
38}
39
40#[derive(Clone)]
41pub struct HyperEventSender<C> {
42    url: hyper::Uri,
43    sdk_key: String,
44    http: hyper::Client<C>,
45    default_headers: HashMap<&'static str, String>,
46
47    // used with event-compression feature
48    #[allow(dead_code)]
49    compress_events: bool,
50}
51
52impl<C> HyperEventSender<C>
53where
54    C: Service<Uri> + Clone + Send + Sync + 'static,
55    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
56    C::Future: Send + Unpin + 'static,
57    C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
58{
59    pub fn new(
60        connector: C,
61        url: hyper::Uri,
62        sdk_key: &str,
63        default_headers: HashMap<&'static str, String>,
64        compress_events: bool,
65    ) -> Self {
66        Self {
67            url,
68            sdk_key: sdk_key.to_owned(),
69            http: hyper::Client::builder().build(connector),
70            default_headers,
71            compress_events,
72        }
73    }
74
75    fn get_server_time_from_response<Body>(&self, response: &hyper::Response<Body>) -> u128 {
76        let date_value = response
77            .headers()
78            .get("date")
79            .unwrap_or(&crate::EMPTY_HEADER)
80            .to_str()
81            .unwrap_or("")
82            .to_owned();
83
84        match DateTime::parse_from_rfc2822(&date_value) {
85            Ok(date) => date.timestamp_millis() as u128,
86            Err(_) => 0,
87        }
88    }
89}
90
91impl<C> EventSender for HyperEventSender<C>
92where
93    C: Service<Uri> + Clone + Send + Sync + 'static,
94    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
95    C::Future: Send + Unpin + 'static,
96    C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
97{
98    fn send_event_data(
99        &self,
100        events: Vec<OutputEvent>,
101        result_tx: Sender<EventSenderResult>,
102    ) -> BoxFuture<()> {
103        Box::pin(async move {
104            let uuid = Uuid::new_v4();
105
106            debug!(
107                "Sending ({}): {}",
108                uuid,
109                serde_json::to_string_pretty(&events).unwrap_or_else(|e| e.to_string())
110            );
111
112            // mut is needed for event-compression feature
113            #[allow(unused_mut)]
114            let mut payload = match serde_json::to_vec(&events) {
115                Ok(json) => json,
116                Err(e) => {
117                    error!(
118                        "Failed to serialize event payload. Some events were dropped: {:?}",
119                        e
120                    );
121                    return;
122                }
123            };
124
125            // mut is needed for event-compression feature
126            #[allow(unused_mut)]
127            let mut additional_headers = self.default_headers.clone();
128
129            #[cfg(feature = "event-compression")]
130            if self.compress_events {
131                let mut e = GzEncoder::new(Vec::new(), Compression::default());
132                if e.write_all(payload.as_slice()).is_ok() {
133                    if let Ok(compressed) = e.finish() {
134                        payload = compressed;
135                        additional_headers.insert("Content-Encoding", "gzip".into());
136                    }
137                }
138            }
139
140            for attempt in 1..=2 {
141                if attempt == 2 {
142                    sleep(Duration::from_secs(1)).await;
143                }
144
145                let mut request_builder = hyper::Request::builder()
146                    .uri(self.url.clone())
147                    .method("POST")
148                    .header("Content-Type", "application/json")
149                    .header("Authorization", self.sdk_key.clone())
150                    .header("User-Agent", &*crate::USER_AGENT)
151                    .header(
152                        LAUNCHDARKLY_EVENT_SCHEMA_HEADER,
153                        crate::CURRENT_EVENT_SCHEMA,
154                    )
155                    .header(LAUNCHDARKLY_PAYLOAD_ID_HEADER, uuid.to_string());
156
157                for default_header in &additional_headers {
158                    request_builder =
159                        request_builder.header(*default_header.0, default_header.1.as_str());
160                }
161                let request = request_builder.body(hyper::Body::from(payload.clone()));
162
163                let result = self.http.request(request.unwrap()).await;
164
165                let response = match result {
166                    Ok(response) => response,
167                    Err(_) if attempt == 1 => continue,
168                    Err(e) => {
169                        // It appears this type of error will not be an HTTP error.
170                        // It will be a closed connection, aborted write, timeout, etc.
171                        error!("Failed to send events. Some events were dropped: {:?}", e);
172                        result_tx
173                            .send(EventSenderResult {
174                                success: false,
175                                time_from_server: 0,
176                                must_shutdown: false,
177                            })
178                            .unwrap();
179                        return;
180                    }
181                };
182
183                if response.status().is_success() {
184                    let _ = result_tx.send(EventSenderResult {
185                        success: true,
186                        time_from_server: self.get_server_time_from_response(&response),
187                        must_shutdown: false,
188                    });
189                    return;
190                }
191
192                if !is_http_error_recoverable(response.status().as_u16()) {
193                    result_tx
194                        .send(EventSenderResult {
195                            success: false,
196                            time_from_server: 0,
197                            must_shutdown: true,
198                        })
199                        .unwrap();
200                    return;
201                }
202            }
203
204            result_tx
205                .send(EventSenderResult {
206                    success: false,
207                    time_from_server: 0,
208                    must_shutdown: false,
209                })
210                .unwrap();
211        })
212    }
213}
214
215#[cfg(test)]
216pub(crate) struct InMemoryEventSender {
217    event_tx: Sender<OutputEvent>,
218}
219
220#[cfg(test)]
221impl InMemoryEventSender {
222    pub(crate) fn new(event_tx: Sender<OutputEvent>) -> Self {
223        Self { event_tx }
224    }
225}
226
227#[cfg(test)]
228impl EventSender for InMemoryEventSender {
229    fn send_event_data(
230        &self,
231        events: Vec<OutputEvent>,
232        sender: Sender<EventSenderResult>,
233    ) -> BoxFuture<()> {
234        Box::pin(async move {
235            for event in events {
236                self.event_tx.send(event).unwrap();
237            }
238
239            sender
240                .send(EventSenderResult {
241                    time_from_server: 0,
242                    success: true,
243                    must_shutdown: true,
244                })
245                .unwrap();
246        })
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use super::*;
253    use crossbeam_channel::bounded;
254    use std::str::FromStr;
255    use test_case::test_case;
256
257    #[test_case(hyper::StatusCode::CONTINUE, true)]
258    #[test_case(hyper::StatusCode::OK, true)]
259    #[test_case(hyper::StatusCode::MULTIPLE_CHOICES, true)]
260    #[test_case(hyper::StatusCode::BAD_REQUEST, true)]
261    #[test_case(hyper::StatusCode::UNAUTHORIZED, false)]
262    #[test_case(hyper::StatusCode::REQUEST_TIMEOUT, true)]
263    #[test_case(hyper::StatusCode::CONFLICT, false)]
264    #[test_case(hyper::StatusCode::TOO_MANY_REQUESTS, true)]
265    #[test_case(hyper::StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE, false)]
266    #[test_case(hyper::StatusCode::INTERNAL_SERVER_ERROR, true)]
267    fn can_determine_recoverable_errors(status: hyper::StatusCode, is_recoverable: bool) {
268        assert_eq!(is_recoverable, is_http_error_recoverable(status.as_u16()));
269    }
270
271    #[tokio::test]
272    async fn can_parse_server_time_from_response() {
273        let mut server = mockito::Server::new_async().await;
274        server
275            .mock("POST", "/bulk")
276            .with_status(200)
277            .with_header("date", "Fri, 13 Feb 2009 23:31:30 GMT")
278            .create_async()
279            .await;
280
281        let (tx, rx) = bounded::<EventSenderResult>(5);
282        let event_sender = build_event_sender(server.url());
283
284        event_sender.send_event_data(vec![], tx).await;
285
286        let sender_result = rx.recv().unwrap();
287        assert!(sender_result.success);
288        assert!(!sender_result.must_shutdown);
289        assert_eq!(sender_result.time_from_server, 1234567890000);
290    }
291
292    #[tokio::test]
293    async fn unrecoverable_failure_requires_shutdown() {
294        let mut server = mockito::Server::new_async().await;
295        server
296            .mock("POST", "/bulk")
297            .with_status(401)
298            .create_async()
299            .await;
300
301        let (tx, rx) = bounded::<EventSenderResult>(5);
302        let event_sender = build_event_sender(server.url());
303
304        event_sender.send_event_data(vec![], tx).await;
305
306        let sender_result = rx.recv().expect("Failed to receive sender_result");
307        assert!(!sender_result.success);
308        assert!(sender_result.must_shutdown);
309    }
310
311    #[tokio::test]
312    async fn recoverable_failures_are_attempted_multiple_times() {
313        let mut server = mockito::Server::new_async().await;
314        let mock = server
315            .mock("POST", "/bulk")
316            .with_status(400)
317            .expect(2)
318            .create_async()
319            .await;
320
321        let (tx, rx) = bounded::<EventSenderResult>(5);
322        let event_sender = build_event_sender(server.url());
323
324        event_sender.send_event_data(vec![], tx).await;
325
326        let sender_result = rx.recv().expect("Failed to receive sender_result");
327        assert!(!sender_result.success);
328        assert!(!sender_result.must_shutdown);
329        mock.assert();
330    }
331
332    #[tokio::test]
333    async fn retrying_requests_can_eventually_succeed() {
334        let mut server = mockito::Server::new_async().await;
335        server
336            .mock("POST", "/bulk")
337            .with_status(400)
338            .create_async()
339            .await;
340        server
341            .mock("POST", "/bulk")
342            .with_status(200)
343            .with_header("date", "Fri, 13 Feb 2009 23:31:30 GMT")
344            .create_async()
345            .await;
346
347        let (tx, rx) = bounded::<EventSenderResult>(5);
348        let event_sender = build_event_sender(server.url());
349
350        event_sender.send_event_data(vec![], tx).await;
351
352        let sender_result = rx.recv().expect("Failed to receive sender_result");
353        assert!(sender_result.success);
354        assert!(!sender_result.must_shutdown);
355        assert_eq!(sender_result.time_from_server, 1234567890000);
356    }
357
358    fn build_event_sender(url: String) -> HyperEventSender<hyper::client::HttpConnector> {
359        let url = format!("{}/bulk", &url);
360        let url = hyper::Uri::from_str(&url).expect("Failed parsing the mock server url");
361
362        HyperEventSender::new(
363            hyper::client::HttpConnector::new(),
364            url,
365            "sdk-key",
366            HashMap::<&str, String>::new(),
367            false,
368        )
369    }
370}