1use super::service_endpoints;
2use crate::data_source::{DataSource, NullDataSource, PollingDataSource, StreamingDataSource};
3use crate::feature_requester_builders::{FeatureRequesterFactory, HyperFeatureRequesterBuilder};
4use hyper::{client::connect::Connection, service::Service, Uri};
5#[cfg(feature = "rustls")]
6use hyper_rustls::HttpsConnectorBuilder;
7use std::sync::{Arc, Mutex};
8use std::time::Duration;
9use thiserror::Error;
10use tokio::io::{AsyncRead, AsyncWrite};
11
12#[cfg(test)]
13use super::data_source;
14
15#[non_exhaustive]
17#[derive(Debug, Error)]
18pub enum BuildError {
19 #[error("data source factory failed to build: {0}")]
21 InvalidConfig(String),
22}
23
24const DEFAULT_INITIAL_RECONNECT_DELAY: Duration = Duration::from_secs(1);
25const MINIMUM_POLL_INTERVAL: Duration = Duration::from_secs(30);
26
27pub trait DataSourceFactory {
29 fn build(
30 &self,
31 endpoints: &service_endpoints::ServiceEndpoints,
32 sdk_key: &str,
33 tags: Option<String>,
34 ) -> Result<Arc<dyn DataSource>, BuildError>;
35 fn to_owned(&self) -> Box<dyn DataSourceFactory>;
36}
37
38#[derive(Clone)]
59pub struct StreamingDataSourceBuilder<C> {
60 initial_reconnect_delay: Duration,
61 connector: Option<C>,
62}
63
64impl<C> StreamingDataSourceBuilder<C> {
65 pub fn new() -> Self {
67 Self {
68 initial_reconnect_delay: DEFAULT_INITIAL_RECONNECT_DELAY,
69 connector: None,
70 }
71 }
72
73 pub fn initial_reconnect_delay(&mut self, duration: Duration) -> &mut Self {
75 self.initial_reconnect_delay = duration;
76 self
77 }
78
79 pub fn https_connector(&mut self, connector: C) -> &mut Self {
84 self.connector = Some(connector);
85 self
86 }
87}
88
89impl<C> DataSourceFactory for StreamingDataSourceBuilder<C>
90where
91 C: Service<Uri> + Clone + Send + Sync + 'static,
92 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
93 C::Future: Send + 'static,
94 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
95{
96 fn build(
97 &self,
98 endpoints: &service_endpoints::ServiceEndpoints,
99 sdk_key: &str,
100 tags: Option<String>,
101 ) -> Result<Arc<dyn DataSource>, BuildError> {
102 let data_source_result = match &self.connector {
103 #[cfg(feature = "rustls")]
104 None => {
105 let connector = HttpsConnectorBuilder::new()
106 .with_native_roots()
107 .https_or_http()
108 .enable_http1()
109 .enable_http2()
110 .build();
111 Ok(StreamingDataSource::new(
112 endpoints.streaming_base_url(),
113 sdk_key,
114 self.initial_reconnect_delay,
115 &tags,
116 connector,
117 ))
118 }
119 #[cfg(not(feature = "rustls"))]
120 None => Err(BuildError::InvalidConfig(
121 "https connector required when rustls is disabled".into(),
122 )),
123 Some(connector) => Ok(StreamingDataSource::new(
124 endpoints.streaming_base_url(),
125 sdk_key,
126 self.initial_reconnect_delay,
127 &tags,
128 connector.clone(),
129 )),
130 };
131 let data_source = data_source_result?
132 .map_err(|e| BuildError::InvalidConfig(format!("invalid stream_base_url: {:?}", e)))?;
133 Ok(Arc::new(data_source))
134 }
135
136 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
137 Box::new(self.clone())
138 }
139}
140
141impl<C> Default for StreamingDataSourceBuilder<C> {
142 fn default() -> Self {
143 StreamingDataSourceBuilder::new()
144 }
145}
146
147#[derive(Clone)]
148pub struct NullDataSourceBuilder {}
149
150impl NullDataSourceBuilder {
151 pub fn new() -> Self {
152 Self {}
153 }
154}
155
156impl DataSourceFactory for NullDataSourceBuilder {
157 fn build(
158 &self,
159 _: &service_endpoints::ServiceEndpoints,
160 _: &str,
161 _: Option<String>,
162 ) -> Result<Arc<dyn DataSource>, BuildError> {
163 Ok(Arc::new(NullDataSource::new()))
164 }
165
166 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
167 Box::new(self.clone())
168 }
169}
170
171impl Default for NullDataSourceBuilder {
172 fn default() -> Self {
173 NullDataSourceBuilder::new()
174 }
175}
176
177#[derive(Clone)]
201pub struct PollingDataSourceBuilder<C> {
202 poll_interval: Duration,
203 connector: Option<C>,
204}
205
206impl<C> PollingDataSourceBuilder<C> {
231 pub fn new() -> Self {
233 Self {
234 poll_interval: MINIMUM_POLL_INTERVAL,
235 connector: None,
236 }
237 }
238
239 pub fn poll_interval(&mut self, poll_interval: Duration) -> &mut Self {
244 self.poll_interval = std::cmp::max(poll_interval, MINIMUM_POLL_INTERVAL);
245 self
246 }
247
248 pub fn https_connector(&mut self, connector: C) -> &mut Self {
253 self.connector = Some(connector);
254 self
255 }
256}
257
258impl<C> DataSourceFactory for PollingDataSourceBuilder<C>
259where
260 C: Service<Uri> + Clone + Send + Sync + 'static,
261 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
262 C::Future: Send + Unpin + 'static,
263 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
264{
265 fn build(
266 &self,
267 endpoints: &service_endpoints::ServiceEndpoints,
268 sdk_key: &str,
269 tags: Option<String>,
270 ) -> Result<Arc<dyn DataSource>, BuildError> {
271 let feature_requester_builder: Result<Box<dyn FeatureRequesterFactory>, BuildError> =
272 match &self.connector {
273 #[cfg(feature = "rustls")]
274 None => {
275 let connector = HttpsConnectorBuilder::new()
276 .with_native_roots()
277 .https_or_http()
278 .enable_http1()
279 .enable_http2()
280 .build();
281
282 Ok(Box::new(HyperFeatureRequesterBuilder::new(
283 endpoints.polling_base_url(),
284 sdk_key,
285 connector,
286 )))
287 }
288 #[cfg(not(feature = "rustls"))]
289 None => Err(BuildError::InvalidConfig(
290 "https connector required when rustls is disabled".into(),
291 )),
292 Some(connector) => Ok(Box::new(HyperFeatureRequesterBuilder::new(
293 endpoints.polling_base_url(),
294 sdk_key,
295 connector.clone(),
296 ))),
297 };
298
299 let feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>> =
300 Arc::new(Mutex::new(feature_requester_builder?));
301
302 let data_source =
303 PollingDataSource::new(feature_requester_factory, self.poll_interval, tags);
304 Ok(Arc::new(data_source))
305 }
306
307 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
308 Box::new(self.clone())
309 }
310}
311
312impl<C> Default for PollingDataSourceBuilder<C> {
313 fn default() -> Self {
314 PollingDataSourceBuilder::new()
315 }
316}
317
318#[cfg(test)]
320#[derive(Clone)]
321pub(crate) struct MockDataSourceBuilder {
322 data_source: Option<Arc<data_source::MockDataSource>>,
323}
324
325#[cfg(test)]
326impl MockDataSourceBuilder {
327 pub fn new() -> MockDataSourceBuilder {
328 MockDataSourceBuilder { data_source: None }
329 }
330
331 pub fn data_source(
332 &mut self,
333 data_source: Arc<data_source::MockDataSource>,
334 ) -> &mut MockDataSourceBuilder {
335 self.data_source = Some(data_source);
336 self
337 }
338}
339
340#[cfg(test)]
341impl DataSourceFactory for MockDataSourceBuilder {
342 fn build(
343 &self,
344 _endpoints: &service_endpoints::ServiceEndpoints,
345 _sdk_key: &str,
346 _tags: Option<String>,
347 ) -> Result<Arc<dyn DataSource>, BuildError> {
348 Ok(self.data_source.as_ref().unwrap().clone())
349 }
350
351 fn to_owned(&self) -> Box<dyn DataSourceFactory> {
352 Box::new(self.clone())
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use hyper::client::HttpConnector;
359
360 use super::*;
361
362 #[test]
363 fn default_stream_builder_has_correct_defaults() {
364 let builder: StreamingDataSourceBuilder<HttpConnector> = StreamingDataSourceBuilder::new();
365
366 assert_eq!(
367 builder.initial_reconnect_delay,
368 DEFAULT_INITIAL_RECONNECT_DELAY
369 );
370 }
371
372 #[test]
373 fn stream_builder_can_use_custom_connector() {
374 #[derive(Debug, Clone)]
375 struct TestConnector;
376 impl hyper::service::Service<hyper::Uri> for TestConnector {
377 type Response = tokio::net::TcpStream;
378 type Error = std::io::Error;
379 type Future = futures::future::BoxFuture<'static, Result<Self::Response, Self::Error>>;
380
381 fn poll_ready(
382 &mut self,
383 _cx: &mut std::task::Context<'_>,
384 ) -> std::task::Poll<Result<(), Self::Error>> {
385 std::task::Poll::Ready(Ok(()))
386 }
387
388 fn call(&mut self, _req: hyper::Uri) -> Self::Future {
389 unreachable!();
391 }
392 }
393
394 let mut builder = StreamingDataSourceBuilder::new();
395 builder.https_connector(TestConnector);
396 assert!(builder
397 .build(
398 &crate::ServiceEndpointsBuilder::new().build().unwrap(),
399 "test",
400 None
401 )
402 .is_ok());
403 }
404
405 #[test]
406 fn default_polling_builder_has_correct_defaults() {
407 let builder = PollingDataSourceBuilder::<HttpConnector>::new();
408 assert_eq!(builder.poll_interval, MINIMUM_POLL_INTERVAL,);
409 }
410
411 #[test]
412 fn initial_reconnect_delay_for_streaming_can_be_adjusted() {
413 let mut builder = StreamingDataSourceBuilder::<()>::new();
414 builder.initial_reconnect_delay(Duration::from_secs(1234));
415 assert_eq!(builder.initial_reconnect_delay, Duration::from_secs(1234));
416 }
417}