use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
#[cfg(feature = "rustls")]
use hyper_rustls::HttpsConnectorBuilder;
#[cfg(all(feature = "hypertls", not(feature = "rustls")))]
use hyper_tls::HttpsConnector;
use launchdarkly_server_sdk_evaluation::Reference;
use thiserror::Error;
use crate::events::sender::HyperEventSender;
use crate::{service_endpoints, LAUNCHDARKLY_TAGS_HEADER};
use super::processor::{
EventProcessor, EventProcessorError, EventProcessorImpl, NullEventProcessor,
};
use super::sender::EventSender;
use super::{EventsConfiguration, OnEventSenderResultSuccess};
const DEFAULT_FLUSH_POLL_INTERVAL: Duration = Duration::from_secs(5);
const DEFAULT_EVENT_CAPACITY: usize = 500;
const DEFAULT_CONTEXT_KEY_CAPACITY: Option<NonZeroUsize> = NonZeroUsize::new(1000);
const DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL: Duration = Duration::from_secs(5 * 60);
#[non_exhaustive]
#[derive(Debug, Error)]
pub enum BuildError {
#[error("event processor factory failed to build: {0}")]
InvalidConfig(String),
#[error(transparent)]
FailedToStart(EventProcessorError),
}
pub trait EventProcessorFactory {
fn build(
&self,
endpoints: &service_endpoints::ServiceEndpoints,
sdk_key: &str,
tags: Option<String>,
) -> Result<Arc<dyn EventProcessor>, BuildError>;
fn to_owned(&self) -> Box<dyn EventProcessorFactory>;
}
#[derive(Clone)]
pub struct EventProcessorBuilder {
capacity: usize,
flush_interval: Duration,
context_keys_capacity: NonZeroUsize,
context_keys_flush_interval: Duration,
event_sender: Option<Arc<dyn EventSender>>,
all_attributes_private: bool,
private_attributes: HashSet<Reference>,
on_success: OnEventSenderResultSuccess,
}
impl EventProcessorFactory for EventProcessorBuilder {
fn build(
&self,
endpoints: &service_endpoints::ServiceEndpoints,
sdk_key: &str,
tags: Option<String>,
) -> Result<Arc<dyn EventProcessor>, BuildError> {
let url_string = format!("{}/bulk", endpoints.events_base_url());
let mut default_headers = HashMap::<&str, String>::new();
if let Some(tags) = tags {
default_headers.insert(LAUNCHDARKLY_TAGS_HEADER, tags);
}
let event_sender = match &self.event_sender {
Some(event_sender) => event_sender.clone(),
_ => {
#[cfg(feature = "rustls")]
let connector = HttpsConnectorBuilder::new()
.with_native_roots()
.https_or_http()
.enable_http1()
.enable_http2()
.build();
#[cfg(all(feature = "hypertls", not(feature = "rustls")))]
let connector = HttpsConnector::new();
Arc::new(HyperEventSender::new(
hyper::Client::builder().build(connector),
hyper::Uri::from_str(url_string.as_str()).unwrap(),
sdk_key,
default_headers,
))
}
};
let events_configuration = EventsConfiguration {
event_sender,
capacity: self.capacity,
flush_interval: self.flush_interval,
context_keys_capacity: self.context_keys_capacity,
context_keys_flush_interval: self.context_keys_flush_interval,
all_attributes_private: self.all_attributes_private,
private_attributes: self.private_attributes.clone(),
on_success: self.on_success.clone(),
};
let events_processor =
EventProcessorImpl::new(events_configuration).map_err(BuildError::FailedToStart)?;
Ok(Arc::new(events_processor))
}
fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
Box::new(self.clone())
}
}
impl EventProcessorBuilder {
pub fn new() -> Self {
Self {
capacity: DEFAULT_EVENT_CAPACITY,
flush_interval: DEFAULT_FLUSH_POLL_INTERVAL,
context_keys_capacity: DEFAULT_CONTEXT_KEY_CAPACITY
.unwrap_or_else(|| NonZeroUsize::new(1).unwrap()),
context_keys_flush_interval: DEFAULT_CONTEXT_KEYS_FLUSH_INTERVAL,
event_sender: None,
all_attributes_private: false,
private_attributes: HashSet::new(),
on_success: Arc::new(|_| ()),
}
}
pub fn capacity(&mut self, capacity: usize) -> &mut Self {
self.capacity = capacity;
self
}
pub fn flush_interval(&mut self, flush_interval: Duration) -> &mut Self {
self.flush_interval = flush_interval;
self
}
pub fn context_keys_capacity(&mut self, context_keys_capacity: NonZeroUsize) -> &mut Self {
self.context_keys_capacity = context_keys_capacity;
self
}
pub fn context_keys_flush_interval(
&mut self,
context_keys_flush_interval: Duration,
) -> &mut Self {
self.context_keys_flush_interval = context_keys_flush_interval;
self
}
pub fn all_attributes_private(&mut self, all_attributes_private: bool) -> &mut Self {
self.all_attributes_private = all_attributes_private;
self
}
pub fn private_attributes<R>(&mut self, attributes: HashSet<R>) -> &mut Self
where
R: Into<Reference>,
{
self.private_attributes = attributes.into_iter().map(|a| a.into()).collect();
self
}
pub fn on_success(&mut self, on_success: OnEventSenderResultSuccess) -> &mut Self {
self.on_success = on_success;
self
}
#[cfg(test)]
pub fn event_sender(&mut self, event_sender: Arc<dyn EventSender>) -> &mut Self {
self.event_sender = Some(event_sender);
self
}
}
impl Default for EventProcessorBuilder {
fn default() -> Self {
Self::new()
}
}
#[derive(Clone)]
pub struct NullEventProcessorBuilder {}
impl EventProcessorFactory for NullEventProcessorBuilder {
fn build(
&self,
_: &service_endpoints::ServiceEndpoints,
_: &str,
_: Option<String>,
) -> Result<Arc<dyn EventProcessor>, BuildError> {
Ok(Arc::new(NullEventProcessor::new()))
}
fn to_owned(&self) -> Box<dyn EventProcessorFactory> {
Box::new(self.clone())
}
}
impl NullEventProcessorBuilder {
pub fn new() -> Self {
Self {}
}
}
impl Default for NullEventProcessorBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use launchdarkly_server_sdk_evaluation::ContextBuilder;
use maplit::hashset;
use mockito::{mock, Matcher};
use test_case::test_case;
use crate::{events::event::EventFactory, ServiceEndpointsBuilder};
use super::*;
#[test]
fn default_builder_has_correct_defaults() {
let builder = EventProcessorBuilder::new();
assert_eq!(builder.capacity, DEFAULT_EVENT_CAPACITY);
assert_eq!(builder.flush_interval, DEFAULT_FLUSH_POLL_INTERVAL);
}
#[test]
fn capacity_can_be_adjusted() {
let mut builder = EventProcessorBuilder::new();
builder.capacity(1234);
assert_eq!(builder.capacity, 1234);
}
#[test]
fn flush_interval_can_be_adjusted() {
let mut builder = EventProcessorBuilder::new();
builder.flush_interval(Duration::from_secs(1234));
assert_eq!(builder.flush_interval, Duration::from_secs(1234));
}
#[test]
fn context_keys_capacity_can_be_adjusted() {
let mut builder = EventProcessorBuilder::new();
let cap = NonZeroUsize::new(1234).expect("1234 > 0");
builder.context_keys_capacity(cap);
assert_eq!(builder.context_keys_capacity, cap);
}
#[test]
fn context_keys_flush_interval_can_be_adjusted() {
let mut builder = EventProcessorBuilder::new();
builder.context_keys_flush_interval(Duration::from_secs(1000));
assert_eq!(
builder.context_keys_flush_interval,
Duration::from_secs(1000)
);
}
#[test]
fn all_attribute_private_can_be_adjusted() {
let mut builder = EventProcessorBuilder::new();
assert!(!builder.all_attributes_private);
builder.all_attributes_private(true);
assert!(builder.all_attributes_private);
}
#[test]
fn attribte_names_can_be_adjusted() {
let mut builder = EventProcessorBuilder::new();
assert!(builder.private_attributes.is_empty());
builder.private_attributes(hashset!["name"]);
assert!(builder.private_attributes.contains(&"name".into()));
}
#[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
#[test_case(None, Matcher::Missing)]
fn processor_sends_correct_headers(tag: Option<String>, matcher: impl Into<Matcher>) {
let mock_endpoint = mock("POST", "/bulk")
.with_status(200)
.expect_at_least(1)
.match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
.create();
let service_endpoints = ServiceEndpointsBuilder::new()
.events_base_url(&mockito::server_url())
.polling_base_url(&mockito::server_url())
.streaming_base_url(&mockito::server_url())
.build()
.expect("Service endpoints failed to be created");
let builder = EventProcessorBuilder::new();
let processor = builder
.build(&service_endpoints, "sdk-key", tag)
.expect("Processor failed to build");
let event_factory = EventFactory::new(false);
let context = ContextBuilder::new("bob")
.build()
.expect("Failed to create context");
let identify_event = event_factory.new_identify(context);
processor.send(identify_event);
processor.close();
mock_endpoint.assert();
}
}