launchdarkly_server_sdk/events/
processor_builders.rs1use std::collections::{HashMap, HashSet};
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use hyper::client::connect::Connection;
8use hyper::service::Service;
9use hyper::Uri;
10#[cfg(feature = "rustls")]
11use hyper_rustls::HttpsConnectorBuilder;
12use launchdarkly_server_sdk_evaluation::Reference;
13use thiserror::Error;
14use tokio::io::{AsyncRead, AsyncWrite};
15
16use crate::events::sender::HyperEventSender;
17use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER};
18
19use super::processor::{
20 EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor,
21};
22use super::sender::EventSender;
23use super::{EventsConfiguration, OnEventSenderResultSuccess};
24
25const DEFAULT_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(5);
26const DEFAULT_EVENT_CAPACITY: usize = 500;
27const DEFAULT_CONTEXT_KEY_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1000);
30const DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL: Duration = Duration::from_secs(5 * 60);
31
32#[non_exhaustive]
34#[derive(Debug, Error)]
35pub enum BuildError {
36 #[error("event processor factory failed to build: {0}")]
38 InvalidConfig(String),
39
40 #[error(transparent)]
42 FailedToStart(EventProcessorError),
43}
44
45pub trait EventProcessorFactory {
48 fn build(
49 &self,
50 endpoints: &service_endpoints::ServiceEndpoints,
51 sdk_key: &str,
52 tags: Option<String>,
53 ) -> Result<Arc<dyn EventProcessor>, BuildError>;
54 fn to_owned(&self) -> Box<dyn EventProcessorFactory>;
55}
56
57#[derive(Clone)]
77pub struct EventProcessorBuilder<C> {
78 capacity: usize,
79 flush_interval: Duration,
80 context_keys_capacity: NonZeroUsize,
81 context_keys_flush_interval: Duration,
82 event_sender: Option<Arc<dyn EventSender>>,
83 all_attributes_private: bool,
84 private_attributes: HashSet<Reference>,
85 connector: Option<C>,
86 omit_anonymous_contexts: bool,
87 compress_events: bool,
88 on_success: OnEventSenderResultSuccess,
90}
91
92impl<C> EventProcessorFactory for EventProcessorBuilder<C>
93where
94 C: Service<Uri> + Clone + Send + Sync + 'static,
95 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
96 C::Future: Send + Unpin + 'static,
97 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
98{
99 fn build(
100 &self,
101 endpoints: &service_endpoints::ServiceEndpoints,
102 sdk_key: &str,
103 tags: Option<String>,
104 ) -> Result<Arc<dyn EventProcessor>, BuildError> {
105 let url_string = format!("{}/bulk", endpoints.events_base_url());
106
107 let mut default_headers = HashMap::<&str, String>::new();
108
109 if let Some(tags) = tags {
110 default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags);
111 }
112
113 let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
114 if let Some(event_sender) = &self.event_sender {
116 Ok(event_sender.clone())
117 } else if let Some(connector) = &self.connector {
118 Ok(Arc::new(HyperEventSender::new(
119 connector.clone(),
120 hyper::Uri::from_str(url_string.as_str()).unwrap(),
121 sdk_key,
122 default_headers,
123 self.compress_events,
124 )))
125 } else {
126 #[cfg(feature = "rustls")]
127 {
128 let connector = HttpsConnectorBuilder::new()
129 .with_native_roots()
130 .https_or_http()
131 .enable_http1()
132 .enable_http2()
133 .build();
134
135 Ok(Arc::new(HyperEventSender::new(
136 connector,
137 hyper::Uri::from_str(url_string.as_str()).unwrap(),
138 sdk_key,
139 default_headers,
140 self.compress_events,
141 )))
142 }
143 #[cfg(not(feature = "rustls"))]
144 Err(BuildError::InvalidConfig(
145 "https connector is required when rustls is disabled".into(),
146 ))
147 };
148 let event_sender = event_sender_result?;
149
150 let events_configuration = EventsConfiguration {
151 event_sender,
152 capacity: self.capacity,
153 flush_interval: self.flush_interval,
154 context_keys_capacity: self.context_keys_capacity,
155 context_keys_flush_interval: self.context_keys_flush_interval,
156 all_attributes_private: self.all_attributes_private,
157 private_attributes: self.private_attributes.clone(),
158 omit_anonymous_contexts: self.omit_anonymous_contexts,
159 on_success: self.on_success.clone(),
160 };
161
162 let events_processor =
163 EventProcessorImpl::new(events_configuration).map_err(BuildError::FailedToStart)?;
164
165 Ok(Arc::new(events_processor))
166 }
167
168 fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
169 Box::new(self.clone())
170 }
171}
172
173impl<C> EventProcessorBuilder<C> {
174 pub fn new() -> Self {
176 Self {
177 capacity: DEFAULT_EVENT_CAPACITY,
178 flush_interval: DEFAULT_FLUSH_POLL_INTERVAL,
179 context_keys_capacity: DEFAULT_CONTEXT_KEY_CAPACITY
180 .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
181 context_keys_flush_interval: DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL,
182 event_sender: None,
183 all_attributes_private: false,
184 private_attributes: HashSet::new(),
185 omit_anonymous_contexts: false,
186 connector: None,
187 compress_events: false,
188 on_success: Arc::new(|_| ()),
189 }
190 }
191
192 pub fn capacity(&mut self, capacity: usize) -> &mut Self {
199 self.capacity = capacity;
200 self
201 }
202
203 pub fn flush_interval(&mut self, flush_interval: Duration) -> &mut Self {
207 self.flush_interval = flush_interval;
208 self
209 }
210
211 pub fn context_keys_capacity(&mut self, context_keys_capacity: NonZeroUsize) -> &mut Self {
216 self.context_keys_capacity = context_keys_capacity;
217 self
218 }
219
220 pub fn context_keys_flush_interval(
222 &mut self,
223 context_keys_flush_interval: Duration,
224 ) -> &mut Self {
225 self.context_keys_flush_interval = context_keys_flush_interval;
226 self
227 }
228
229 pub fn all_attributes_private(&mut self, all_attributes_private: bool) -> &mut Self {
234 self.all_attributes_private = all_attributes_private;
235 self
236 }
237
238 pub fn private_attributes<R>(&mut self, attributes: HashSet<R>) -> &mut Self
244 where
245 R: Into<Reference>,
246 {
247 self.private_attributes = attributes.into_iter().map(|a| a.into()).collect();
248 self
249 }
250
251 pub fn https_connector(&mut self, connector: C) -> &mut Self {
256 self.connector = Some(connector);
257 self
258 }
259
260 pub fn omit_anonymous_contexts(&mut self, omit: bool) -> &mut Self {
265 self.omit_anonymous_contexts = omit;
266 self
267 }
268
269 #[cfg(feature = "event-compression")]
270 pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
277 self.compress_events = enabled;
278 self
279 }
280
281 pub fn on_success(&mut self, on_success: OnEventSenderResultSuccess) -> &mut Self {
283 self.on_success = on_success;
284 self
285 }
286
287 #[cfg(test)]
288 pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
290 self.event_sender = Some(event_sender);
291 self
292 }
293}
294
295impl<C> Default for EventProcessorBuilder<C> {
296 fn default() -> Self {
297 Self::new()
298 }
299}
300
301#[derive(Clone)]
304pub struct NullEventProcessorBuilder {}
305
306impl EventProcessorFactory for NullEventProcessorBuilder {
307 fn build(
308 &self,
309 _: &service_endpoints::ServiceEndpoints,
310 _: &str,
311 _: Option<String>,
312 ) -> Result<Arc<dyn EventProcessor>, BuildError> {
313 Ok(Arc::new(NullEventProcessor::new()))
314 }
315
316 fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
317 Box::new(self.clone())
318 }
319}
320
321impl NullEventProcessorBuilder {
322 pub fn new() -> Self {
324 Self {}
325 }
326}
327
328impl Default for NullEventProcessorBuilder {
329 fn default() -> Self {
330 Self::new()
331 }
332}
333
334#[cfg(test)]
335mod tests {
336 use hyper::client::HttpConnector;
337 use launchdarkly_server_sdk_evaluation::ContextBuilder;
338 use maplit::hashset;
339 use mockito::Matcher;
340 use test_case::test_case;
341
342 use crate::{events::event::EventFactory, ServiceEndpointsBuilder};
343
344 use super::*;
345
346 #[test]
347 fn default_builder_has_correct_defaults() {
348 let builder = EventProcessorBuilder::<HttpConnector>::new();
349 assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
350 assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
351 }
352
353 #[test]
354 fn capacity_can_be_adjusted() {
355 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
356 builder.capacity(1234);
357 assert_eq!(builder.capacity, 1234);
358 }
359
360 #[test]
361 fn flush_interval_can_be_adjusted() {
362 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
363 builder.flush_interval(Duration::from_secs(1234));
364 assert_eq!(builder.flush_interval, Duration::from_secs(1234));
365 }
366
367 #[test]
368 fn context_keys_capacity_can_be_adjusted() {
369 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
370 let cap = NonZeroUsize::new(1234).expect("1234 > 0");
371 builder.context_keys_capacity(cap);
372 assert_eq!(builder.context_keys_capacity, cap);
373 }
374
375 #[test]
376 fn context_keys_flush_interval_can_be_adjusted() {
377 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
378 builder.context_keys_flush_interval(Duration::from_secs(1000));
379 assert_eq!(
380 builder.context_keys_flush_interval,
381 Duration::from_secs(1000)
382 );
383 }
384
385 #[test]
386 fn all_attribute_private_can_be_adjusted() {
387 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
388
389 assert!(!builder.all_attributes_private);
390 builder.all_attributes_private(true);
391 assert!(builder.all_attributes_private);
392 }
393
394 #[test]
395 fn attribte_names_can_be_adjusted() {
396 let mut builder = EventProcessorBuilder::<HttpConnector>::new();
397
398 assert!(builder.private_attributes.is_empty());
399 builder.private_attributes(hashset!["name"]);
400 assert!(builder.private_attributes.contains(&"name".into()));
401 }
402
403 #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
404 #[test_case(None, Matcher::Missing)]
405 fn processor_sends_correct_headers(tag: Option<String>, matcher: impl Into<Matcher>) {
406 let mut server = mockito::Server::new();
407 let mock = server
408 .mock("POST", "/bulk")
409 .with_status(200)
410 .expect_at_least(1)
411 .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
412 .create();
413
414 let service_endpoints = ServiceEndpointsBuilder::new()
415 .events_base_url(&server.url())
416 .polling_base_url(&server.url())
417 .streaming_base_url(&server.url())
418 .build()
419 .expect("Service endpoints failed to be created");
420
421 let builder = EventProcessorBuilder::<HttpConnector>::new();
422 let processor = builder
423 .build(&service_endpoints, "sdk-key", tag)
424 .expect("Processor failed to build");
425
426 let event_factory = EventFactory::new(false);
427
428 let context = ContextBuilder::new("bob")
429 .build()
430 .expect("Failed to create context");
431 let identify_event = event_factory.new_identify(context);
432
433 processor.send(identify_event);
434 processor.close();
435
436 mock.assert()
437 }
438}