launchdarkly_server_sdk/events/
processor_builders.rs

1use std::collections::{HashMap, HashSet};
2use std::num::NonZeroUsize;
3use std::str::FromStr;
4use std::sync::Arc;
5use std::time::Duration;
6
7use hyper::client::connect::Connection;
8use hyper::service::Service;
9use hyper::Uri;
10#[cfg(feature = "rustls")]
11use hyper_rustls::HttpsConnectorBuilder;
12use launchdarkly_server_sdk_evaluation::Reference;
13use thiserror::Error;
14use tokio::io::{AsyncRead, AsyncWrite};
15
16use crate::events::sender::HyperEventSender;
17use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER};
18
19use super::processor::{
20    EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor,
21};
22use super::sender::EventSender;
23use super::{EventsConfiguration, OnEventSenderResultSuccess};
24
25const DEFAULT_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(5);
26const DEFAULT_EVENT_CAPACITY: usize = 500;
27// The capacity will be set to max(DEFAULT_CONTEXT_KEY_CAPACITY, 1), meaning
28// caching cannot be entirely disabled.
29const DEFAULT_CONTEXT_KEY_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1000);
30const DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL: Duration = Duration::from_secs(5 * 60);
31
32/// Error type used to represent failures when building an [EventProcessor] instance.
33#[non_exhaustive]
34#[derive(Debug, Error)]
35pub enum BuildError {
36    /// Error used when a configuration setting is invalid.
37    #[error("event processor factory failed to build: {0}")]
38    InvalidConfig(String),
39
40    /// Error used when the event processor's thread fails to start
41    #[error(transparent)]
42    FailedToStart(EventProcessorError),
43}
44
45/// Trait which allows creation of event processors. Should be implemented by event processor
46/// builder types.
47pub trait EventProcessorFactory {
48    fn build(
49        &self,
50        endpoints: &service_endpoints::ServiceEndpoints,
51        sdk_key: &str,
52        tags: Option<String>,
53    ) -> Result<Arc<dyn EventProcessor>, BuildError>;
54    fn to_owned(&self) -> Box<dyn EventProcessorFactory>;
55}
56
57/// Contains methods for configuring delivery of analytics events.
58///
59/// The SDK normally buffers analytics events and sends them to LaunchDarkly at intervals. If you want
60/// to customize this behavior, create a builder with [crate::EventProcessorBuilder::new], change its
61/// properties with the methods of this struct, and pass it to [crate::ConfigBuilder::event_processor].
62///
63/// # Examples
64///
65/// Adjust the flush interval
66/// ```
67/// # use launchdarkly_server_sdk::{EventProcessorBuilder, ConfigBuilder};
68/// # use hyper_rustls::HttpsConnector;
69/// # use hyper::client::HttpConnector;
70/// # use std::time::Duration;
71/// # fn main() {
72///     ConfigBuilder::new("sdk-key").event_processor(EventProcessorBuilder::<HttpsConnector<HttpConnector>>::new()
73///         .flush_interval(Duration::from_secs(10)));
74/// # }
75/// ```
76#[derive(Clone)]
77pub struct EventProcessorBuilder<C> {
78    capacity: usize,
79    flush_interval: Duration,
80    context_keys_capacity: NonZeroUsize,
81    context_keys_flush_interval: Duration,
82    event_sender: Option<Arc<dyn EventSender>>,
83    all_attributes_private: bool,
84    private_attributes: HashSet<Reference>,
85    connector: Option<C>,
86    omit_anonymous_contexts: bool,
87    compress_events: bool,
88    // diagnostic_recording_interval: Duration
89    on_success: OnEventSenderResultSuccess,
90}
91
92impl<C> EventProcessorFactory for EventProcessorBuilder<C>
93where
94    C: Service<Uri> + Clone + Send + Sync + 'static,
95    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
96    C::Future: Send + Unpin + 'static,
97    C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
98{
99    fn build(
100        &self,
101        endpoints: &service_endpoints::ServiceEndpoints,
102        sdk_key: &str,
103        tags: Option<String>,
104    ) -> Result<Arc<dyn EventProcessor>, BuildError> {
105        let url_string = format!("{}/bulk", endpoints.events_base_url());
106
107        let mut default_headers = HashMap::<&str, String>::new();
108
109        if let Some(tags) = tags {
110            default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags);
111        }
112
113        let event_sender_result: Result<Arc<dyn EventSender>, BuildError> =
114            // NOTE: This would only be possible under unit testing conditions.
115            if let Some(event_sender) = &self.event_sender {
116                Ok(event_sender.clone())
117            } else if let Some(connector) = &self.connector {
118                Ok(Arc::new(HyperEventSender::new(
119                    connector.clone(),
120                    hyper::Uri::from_str(url_string.as_str()).unwrap(),
121                    sdk_key,
122                    default_headers,
123                    self.compress_events,
124                )))
125            } else {
126                #[cfg(feature = "rustls")]
127                {
128                    let connector = HttpsConnectorBuilder::new()
129                        .with_native_roots()
130                        .https_or_http()
131                        .enable_http1()
132                        .enable_http2()
133                        .build();
134
135                    Ok(Arc::new(HyperEventSender::new(
136                        connector,
137                        hyper::Uri::from_str(url_string.as_str()).unwrap(),
138                        sdk_key,
139                        default_headers,
140                        self.compress_events,
141                    )))
142                }
143                #[cfg(not(feature = "rustls"))]
144                Err(BuildError::InvalidConfig(
145                    "https connector is required when rustls is disabled".into(),
146                ))
147            };
148        let event_sender = event_sender_result?;
149
150        let events_configuration = EventsConfiguration {
151            event_sender,
152            capacity: self.capacity,
153            flush_interval: self.flush_interval,
154            context_keys_capacity: self.context_keys_capacity,
155            context_keys_flush_interval: self.context_keys_flush_interval,
156            all_attributes_private: self.all_attributes_private,
157            private_attributes: self.private_attributes.clone(),
158            omit_anonymous_contexts: self.omit_anonymous_contexts,
159            on_success: self.on_success.clone(),
160        };
161
162        let events_processor =
163            EventProcessorImpl::new(events_configuration).map_err(BuildError::FailedToStart)?;
164
165        Ok(Arc::new(events_processor))
166    }
167
168    fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
169        Box::new(self.clone())
170    }
171}
172
173impl<C> EventProcessorBuilder<C> {
174    /// Create a new [EventProcessorBuilder] with all default values.
175    pub fn new() -> Self {
176        Self {
177            capacity: DEFAULT_EVENT_CAPACITY,
178            flush_interval: DEFAULT_FLUSH_POLL_INTERVAL,
179            context_keys_capacity: DEFAULT_CONTEXT_KEY_CAPACITY
180                .unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
181            context_keys_flush_interval: DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL,
182            event_sender: None,
183            all_attributes_private: false,
184            private_attributes: HashSet::new(),
185            omit_anonymous_contexts: false,
186            connector: None,
187            compress_events: false,
188            on_success: Arc::new(|_| ()),
189        }
190    }
191
192    /// Set the capacity of the events buffer.
193    ///
194    /// The client buffers up to this many events in memory before flushing. If the capacity is exceeded before
195    /// the buffer is flushed [crate::EventProcessor::flush], events will be discarded. Increasing the
196    /// capacity means that events are less likely to be discarded, at the cost of consuming more memory.
197    ///
198    pub fn capacity(&mut self, capacity: usize) -> &mut Self {
199        self.capacity = capacity;
200        self
201    }
202
203    /// Sets the interval between flushes of the event buffer.
204    ///
205    /// Decreasing the flush interval means that the event buffer is less likely to reach capacity.
206    pub fn flush_interval(&mut self, flush_interval: Duration) -> &mut Self {
207        self.flush_interval = flush_interval;
208        self
209    }
210
211    /// Sets the number of context keys that the event processor can remember at any one time.
212    ///
213    /// To avoid sending duplicate context details in analytics events, the SDK maintains a cache of
214    /// recently seen context keys.
215    pub fn context_keys_capacity(&mut self, context_keys_capacity: NonZeroUsize) -> &mut Self {
216        self.context_keys_capacity = context_keys_capacity;
217        self
218    }
219
220    /// Sets the interval at which the event processor will reset its cache of known context keys.
221    pub fn context_keys_flush_interval(
222        &mut self,
223        context_keys_flush_interval: Duration,
224    ) -> &mut Self {
225        self.context_keys_flush_interval = context_keys_flush_interval;
226        self
227    }
228
229    /// Sets whether or not all optional user attributes should be hidden from LaunchDarkly.
230    ///
231    /// If this is true, all user attribute values (other than the key) will be private, not just the attributes
232    /// specified with private_attributes or on a per-user basis with UserBuilder methods. By default, it is false.
233    pub fn all_attributes_private(&mut self, all_attributes_private: bool) -> &mut Self {
234        self.all_attributes_private = all_attributes_private;
235        self
236    }
237
238    /// Marks a set of attribute names as always private.
239    ///
240    /// Any users sent to LaunchDarkly with this configuration active will have attributes with these
241    /// names removed. This is in addition to any attributes that were marked as private for an
242    /// individual user with UserBuilder methods. Setting all_attribute_private to true overrides this.
243    pub fn private_attributes<R>(&mut self, attributes: HashSet<R>) -> &mut Self
244    where
245        R: Into<Reference>,
246    {
247        self.private_attributes = attributes.into_iter().map(|a| a.into()).collect();
248        self
249    }
250
251    /// Sets the connector for the event sender to use. This allows for re-use of a connector
252    /// between multiple client instances. This is especially useful for the `sdk-test-harness`
253    /// where many client instances are created throughout the test and reading the native
254    /// certificates is a substantial portion of the runtime.
255    pub fn https_connector(&mut self, connector: C) -> &mut Self {
256        self.connector = Some(connector);
257        self
258    }
259
260    /// Sets whether anonymous contexts should be omitted from index and identify events.
261    ///
262    /// The default is false, meaning that anonymous contexts will be included in index and
263    /// identify events.
264    pub fn omit_anonymous_contexts(&mut self, omit: bool) -> &mut Self {
265        self.omit_anonymous_contexts = omit;
266        self
267    }
268
269    #[cfg(feature = "event-compression")]
270    /// Should the event payload sent to LaunchDarkly use gzip compression. By
271    /// default this is false to prevent backward breaking compatibility issues with
272    /// older versions of the relay proxy.
273    //
274    /// Customers not using the relay proxy are strongly encouraged to enable this
275    /// feature to reduce egress bandwidth cost.
276    pub fn compress_events(&mut self, enabled: bool) -> &mut Self {
277        self.compress_events = enabled;
278        self
279    }
280
281    /// Set a callback method to be called when handling an `EventSenderResult` with `success = true`.
282    pub fn on_success(&mut self, on_success: OnEventSenderResultSuccess) -> &mut Self {
283        self.on_success = on_success;
284        self
285    }
286
287    #[cfg(test)]
288    /// Test only functionality that allows us to override the event sender.
289    pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
290        self.event_sender = Some(event_sender);
291        self
292    }
293}
294
295impl<C> Default for EventProcessorBuilder<C> {
296    fn default() -> Self {
297        Self::new()
298    }
299}
300
301/// An implementation of EventProcessorFactory that will discard all events received. This should
302/// only be used for unit tests.
303#[derive(Clone)]
304pub struct NullEventProcessorBuilder {}
305
306impl EventProcessorFactory for NullEventProcessorBuilder {
307    fn build(
308        &self,
309        _: &service_endpoints::ServiceEndpoints,
310        _: &str,
311        _: Option<String>,
312    ) -> Result<Arc<dyn EventProcessor>, BuildError> {
313        Ok(Arc::new(NullEventProcessor::new()))
314    }
315
316    fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
317        Box::new(self.clone())
318    }
319}
320
321impl NullEventProcessorBuilder {
322    /// Create a new [NullEventProcessorBuilder] with all default values.
323    pub fn new() -> Self {
324        Self {}
325    }
326}
327
328impl Default for NullEventProcessorBuilder {
329    fn default() -> Self {
330        Self::new()
331    }
332}
333
334#[cfg(test)]
335mod tests {
336    use hyper::client::HttpConnector;
337    use launchdarkly_server_sdk_evaluation::ContextBuilder;
338    use maplit::hashset;
339    use mockito::Matcher;
340    use test_case::test_case;
341
342    use crate::{events::event::EventFactory, ServiceEndpointsBuilder};
343
344    use super::*;
345
346    #[test]
347    fn default_builder_has_correct_defaults() {
348        let builder = EventProcessorBuilder::<HttpConnector>::new();
349        assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
350        assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
351    }
352
353    #[test]
354    fn capacity_can_be_adjusted() {
355        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
356        builder.capacity(1234);
357        assert_eq!(builder.capacity, 1234);
358    }
359
360    #[test]
361    fn flush_interval_can_be_adjusted() {
362        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
363        builder.flush_interval(Duration::from_secs(1234));
364        assert_eq!(builder.flush_interval, Duration::from_secs(1234));
365    }
366
367    #[test]
368    fn context_keys_capacity_can_be_adjusted() {
369        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
370        let cap = NonZeroUsize::new(1234).expect("1234 > 0");
371        builder.context_keys_capacity(cap);
372        assert_eq!(builder.context_keys_capacity, cap);
373    }
374
375    #[test]
376    fn context_keys_flush_interval_can_be_adjusted() {
377        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
378        builder.context_keys_flush_interval(Duration::from_secs(1000));
379        assert_eq!(
380            builder.context_keys_flush_interval,
381            Duration::from_secs(1000)
382        );
383    }
384
385    #[test]
386    fn all_attribute_private_can_be_adjusted() {
387        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
388
389        assert!(!builder.all_attributes_private);
390        builder.all_attributes_private(true);
391        assert!(builder.all_attributes_private);
392    }
393
394    #[test]
395    fn attribte_names_can_be_adjusted() {
396        let mut builder = EventProcessorBuilder::<HttpConnector>::new();
397
398        assert!(builder.private_attributes.is_empty());
399        builder.private_attributes(hashset!["name"]);
400        assert!(builder.private_attributes.contains(&"name".into()));
401    }
402
403    #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
404    #[test_case(None, Matcher::Missing)]
405    fn processor_sends_correct_headers(tag: Option<String>, matcher: impl Into<Matcher>) {
406        let mut server = mockito::Server::new();
407        let mock = server
408            .mock("POST", "/bulk")
409            .with_status(200)
410            .expect_at_least(1)
411            .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
412            .create();
413
414        let service_endpoints = ServiceEndpointsBuilder::new()
415            .events_base_url(&server.url())
416            .polling_base_url(&server.url())
417            .streaming_base_url(&server.url())
418            .build()
419            .expect("Service endpoints failed to be created");
420
421        let builder = EventProcessorBuilder::<HttpConnector>::new();
422        let processor = builder
423            .build(&service_endpoints, "sdk-key", tag)
424            .expect("Processor failed to build");
425
426        let event_factory = EventFactory::new(false);
427
428        let context = ContextBuilder::new("bob")
429            .build()
430            .expect("Failed to create context");
431        let identify_event = event_factory.new_identify(context);
432
433        processor.send(identify_event);
434        processor.close();
435
436        mock.assert()
437    }
438}