1use super::service_endpoints;
2use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource};
3use crate::feature_requester_builders::{FeatureRequesterFactory, HyperFeatureRequesterBuilder};
4use hyper::{client::connect::Connection, service::Service, Uri};
5#[cfg(feature = "rustls")]
6use hyper_rustls::HttpsConnectorBuilder;
7use std::sync::{Arc, Mutex};
8use std::time::Duration;
9use thiserror::Error;
10use tokio::io::{AsyncRead, AsyncWrite};
11
12#[cfg(test)]
13use super::data_source;
14
15#[non_exhaustive]
17#[derive(Debug, Error)]
18pub enum BuildError {
19    #[error("data source factory failed to build: {0}")]
21    InvalidConfig(String),
22}
23
24const DEFAULT_INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
25const MINIMUM_POLL_INTERVAL: Duration = Duration::from_secs(30);
26
27pub trait DataSourceFactory {
29    fn build(
30        &self,
31        endpoints: &service_endpoints::ServiceEndpoints,
32        sdk_key: &str,
33        tags: Option<String>,
34    ) -> Result<Arc<dyn DataSource>, BuildError>;
35    fn to_owned(&self) -> Box<dyn DataSourceFactory>;
36}
37
38#[derive(Clone)]
59pub struct StreamingDataSourceBuilder<C> {
60    initial_reconnect_delay: Duration,
61    connector: Option<C>,
62}
63
64impl<C> StreamingDataSourceBuilder<C> {
65    pub fn new() -> Self {
67        Self {
68            initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY,
69            connector: None,
70        }
71    }
72
73    pub fn initial_reconnect_delay(&mut self, duration: Duration) -> &mut Self {
75        self.initial_reconnect_delay = duration;
76        self
77    }
78
79    pub fn https_connector(&mut self, connector: C) -> &mut Self {
84        self.connector = Some(connector);
85        self
86    }
87}
88
89impl<C> DataSourceFactory for StreamingDataSourceBuilder<C>
90where
91    C: Service<Uri> + Clone + Send + Sync + 'static,
92    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
93    C::Future: Send + 'static,
94    C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
95{
96    fn build(
97        &self,
98        endpoints: &service_endpoints::ServiceEndpoints,
99        sdk_key: &str,
100        tags: Option<String>,
101    ) -> Result<Arc<dyn DataSource>, BuildError> {
102        let data_source_result = match &self.connector {
103            #[cfg(feature = "rustls")]
104            None => {
105                let connector = HttpsConnectorBuilder::new()
106                    .with_native_roots()
107                    .https_or_http()
108                    .enable_http1()
109                    .enable_http2()
110                    .build();
111                Ok(StreamingDataSource::new(
112                    endpoints.streaming_base_url(),
113                    sdk_key,
114                    self.initial_reconnect_delay,
115                    &tags,
116                    connector,
117                ))
118            }
119            #[cfg(not(feature = "rustls"))]
120            None => Err(BuildError::InvalidConfig(
121                "https connector required when rustls is disabled".into(),
122            )),
123            Some(connector) => Ok(StreamingDataSource::new(
124                endpoints.streaming_base_url(),
125                sdk_key,
126                self.initial_reconnect_delay,
127                &tags,
128                connector.clone(),
129            )),
130        };
131        let data_source = data_source_result?
132            .map_err(|e| BuildError::InvalidConfig(format!("invalid stream_base_url: {:?}", e)))?;
133        Ok(Arc::new(data_source))
134    }
135
136    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
137        Box::new(self.clone())
138    }
139}
140
141impl<C> Default for StreamingDataSourceBuilder<C> {
142    fn default() -> Self {
143        StreamingDataSourceBuilder::new()
144    }
145}
146
147#[derive(Clone)]
148pub struct NullDataSourceBuilder {}
149
150impl NullDataSourceBuilder {
151    pub fn new() -> Self {
152        Self {}
153    }
154}
155
156impl DataSourceFactory for NullDataSourceBuilder {
157    fn build(
158        &self,
159        _: &service_endpoints::ServiceEndpoints,
160        _: &str,
161        _: Option<String>,
162    ) -> Result<Arc<dyn DataSource>, BuildError> {
163        Ok(Arc::new(NullDataSource::new()))
164    }
165
166    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
167        Box::new(self.clone())
168    }
169}
170
171impl Default for NullDataSourceBuilder {
172    fn default() -> Self {
173        NullDataSourceBuilder::new()
174    }
175}
176
177#[derive(Clone)]
201pub struct PollingDataSourceBuilder<C> {
202    poll_interval: Duration,
203    connector: Option<C>,
204}
205
206impl<C> PollingDataSourceBuilder<C> {
231    pub fn new() -> Self {
233        Self {
234            poll_interval: MINIMUM_POLL_INTERVAL,
235            connector: None,
236        }
237    }
238
239    pub fn poll_interval(&mut self, poll_interval: Duration) -> &mut Self {
244        self.poll_interval = std::cmp::max(poll_interval, MINIMUM_POLL_INTERVAL);
245        self
246    }
247
248    pub fn https_connector(&mut self, connector: C) -> &mut Self {
253        self.connector = Some(connector);
254        self
255    }
256}
257
258impl<C> DataSourceFactory for PollingDataSourceBuilder<C>
259where
260    C: Service<Uri> + Clone + Send + Sync + 'static,
261    C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
262    C::Future: Send + Unpin + 'static,
263    C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
264{
265    fn build(
266        &self,
267        endpoints: &service_endpoints::ServiceEndpoints,
268        sdk_key: &str,
269        tags: Option<String>,
270    ) -> Result<Arc<dyn DataSource>, BuildError> {
271        let feature_requester_builder: Result<Box<dyn FeatureRequesterFactory>, BuildError> =
272            match &self.connector {
273                #[cfg(feature = "rustls")]
274                None => {
275                    let connector = HttpsConnectorBuilder::new()
276                        .with_native_roots()
277                        .https_or_http()
278                        .enable_http1()
279                        .enable_http2()
280                        .build();
281
282                    Ok(Box::new(HyperFeatureRequesterBuilder::new(
283                        endpoints.polling_base_url(),
284                        sdk_key,
285                        connector,
286                    )))
287                }
288                #[cfg(not(feature = "rustls"))]
289                None => Err(BuildError::InvalidConfig(
290                    "https connector required when rustls is disabled".into(),
291                )),
292                Some(connector) => Ok(Box::new(HyperFeatureRequesterBuilder::new(
293                    endpoints.polling_base_url(),
294                    sdk_key,
295                    connector.clone(),
296                ))),
297            };
298
299        let feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>> =
300            Arc::new(Mutex::new(feature_requester_builder?));
301
302        let data_source =
303            PollingDataSource::new(feature_requester_factory, self.poll_interval, tags);
304        Ok(Arc::new(data_source))
305    }
306
307    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
308        Box::new(self.clone())
309    }
310}
311
312impl<C> Default for PollingDataSourceBuilder<C> {
313    fn default() -> Self {
314        PollingDataSourceBuilder::new()
315    }
316}
317
318#[cfg(test)]
320#[derive(Clone)]
321pub(crate) struct MockDataSourceBuilder {
322    data_source: Option<Arc<data_source::MockDataSource>>,
323}
324
325#[cfg(test)]
326impl MockDataSourceBuilder {
327    pub fn new() -> MockDataSourceBuilder {
328        MockDataSourceBuilder { data_source: None }
329    }
330
331    pub fn data_source(
332        &mut self,
333        data_source: Arc<data_source::MockDataSource>,
334    ) -> &mut MockDataSourceBuilder {
335        self.data_source = Some(data_source);
336        self
337    }
338}
339
340#[cfg(test)]
341impl DataSourceFactory for MockDataSourceBuilder {
342    fn build(
343        &self,
344        _endpoints: &service_endpoints::ServiceEndpoints,
345        _sdk_key: &str,
346        _tags: Option<String>,
347    ) -> Result<Arc<dyn DataSource>, BuildError> {
348        Ok(self.data_source.as_ref().unwrap().clone())
349    }
350
351    fn to_owned(&self) -> Box<dyn DataSourceFactory> {
352        Box::new(self.clone())
353    }
354}
355
356#[cfg(test)]
357mod tests {
358    use hyper::client::HttpConnector;
359
360    use super::*;
361
362    #[test]
363    fn default_stream_builder_has_correct_defaults() {
364        let builder: StreamingDataSourceBuilder<HttpConnector> = StreamingDataSourceBuilder::new();
365
366        assert_eq!(
367            builder.initial_reconnect_delay,
368            DEFAULT_INITIAL_RECONNECT_DELAY
369        );
370    }
371
372    #[test]
373    fn stream_builder_can_use_custom_connector() {
374        #[derive(Debug, Clone)]
375        struct TestConnector;
376        impl hyper::service::Service<hyper::Uri> for TestConnector {
377            type Response = tokio::net::TcpStream;
378            type Error = std::io::Error;
379            type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
380
381            fn poll_ready(
382                &mut self,
383                _cx: &mut std::task::Context<'_>,
384            ) -> std::task::Poll<Result<(), Self::Error>> {
385                std::task::Poll::Ready(Ok(()))
386            }
387
388            fn call(&mut self, _req: hyper::Uri) -> Self::Future {
389                unreachable!();
391            }
392        }
393
394        let mut builder = StreamingDataSourceBuilder::new();
395        builder.https_connector(TestConnector);
396        assert!(builder
397            .build(
398                &crate::ServiceEndpointsBuilder::new().build().unwrap(),
399                "test",
400                None
401            )
402            .is_ok());
403    }
404
405    #[test]
406    fn default_polling_builder_has_correct_defaults() {
407        let builder = PollingDataSourceBuilder::<HttpConnector>::new();
408        assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,);
409    }
410
411    #[test]
412    fn initial_reconnect_delay_for_streaming_can_be_adjusted() {
413        let mut builder = StreamingDataSourceBuilder::<()>::new();
414        builder.initial_reconnect_delay(Duration::from_secs(1234));
415        assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234));
416    }
417}