1use super::stores::store_types::{AllData, DataKind, PatchTarget, StorageItem};
2use crate::feature_requester::FeatureRequesterError;
3use crate::feature_requester_builders::FeatureRequesterFactory;
4use crate::reqwest::is_http_error_recoverable;
5use crate::stores::store::{DataStore, UpdateError};
6use crate::LAUNCHDARKLY_TAGS_HEADER;
7use es::{Client, ClientBuilder, ReconnectOptionsBuilder};
8use eventsource_client as es;
9use futures::StreamExt;
10use hyper::client::connect::Connection;
11use hyper::service::Service;
12use hyper::Uri;
13use launchdarkly_server_sdk_evaluation::{Flag, Segment};
14use parking_lot::RwLock;
15use serde::Deserialize;
16use std::sync::{Arc, Mutex, Once};
17use std::time::Duration;
18use tokio::io::{AsyncRead, AsyncWrite};
19use tokio::sync::broadcast;
20use tokio::time;
21use tokio_stream::wrappers::{BroadcastStream, IntervalStream};
22
23const FLAGS_PREFIX: &str = "/flags/";
24const SEGMENTS_PREFIX: &str = "/segments/";
25
26#[derive(Debug)]
27#[allow(clippy::enum_variant_names, dead_code)]
28pub enum Error {
29 InvalidEventData {
30 event_type: String,
31 error: Box<dyn std::error::Error + Send>,
32 },
33 InvalidPath(String),
34 InvalidUpdate(UpdateError),
35 InvalidEventType(String),
36}
37
38pub type Result<T> = std::result::Result<T, Error>;
39
40#[derive(Deserialize)]
41pub(crate) struct PutData {
42 #[serde(default = "String::default")]
43 path: String,
44 data: AllData<Flag, Segment>,
45}
46
47#[derive(Deserialize)]
48pub(crate) struct PatchData {
49 pub path: String,
50 pub data: PatchTarget,
51}
52
53#[derive(Deserialize)]
54pub(crate) struct DeleteData {
55 path: String,
56 version: u64,
57}
58
59pub type EventReceived = Arc<dyn Fn(&es::SSE) + Send + Sync>;
60
61pub trait DataSource: Send + Sync {
65 fn subscribe(
66 &self,
67 data_store: Arc<RwLock<dyn DataStore>>,
68 init_complete: Arc<dyn Fn(bool) + Send + Sync>,
69 event_received: EventReceived,
70 shutdown_receiver: broadcast::Receiver<()>,
71 );
72}
73
74pub struct StreamingDataSource {
75 es_client: Box<dyn Client>,
76}
77
78impl StreamingDataSource {
79 pub fn new<C>(
80 base_url: &str,
81 sdk_key: &str,
82 initial_reconnect_delay: Duration,
83 tags: &Option<String>,
84 connector: C,
85 ) -> std::result::Result<Self, es::Error>
86 where
87 C: Service<Uri> + Clone + Send + Sync + 'static,
88 C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
89 C::Future: Send + 'static,
90 C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
91 {
92 let stream_url = format!("{}/all", base_url);
93
94 let client_builder = ClientBuilder::for_url(&stream_url)?;
95 let mut client_builder = client_builder
96 .reconnect(
97 ReconnectOptionsBuilder::new(true)
98 .retry_initial(true)
99 .delay(initial_reconnect_delay)
100 .delay_max(Duration::from_secs(30))
101 .build(),
102 )
103 .connect_timeout(Duration::from_secs(10))
104 .read_timeout(Duration::from_secs(300)) .header("Authorization", sdk_key)?
106 .header("User-Agent", &crate::USER_AGENT)?;
107
108 if let Some(tags) = tags {
109 client_builder = client_builder.header(LAUNCHDARKLY_TAGS_HEADER, tags)?;
110 }
111
112 Ok(Self {
113 es_client: Box::new(client_builder.build_with_conn(connector)),
114 })
115 }
116}
117
118impl DataSource for StreamingDataSource {
119 fn subscribe(
120 &self,
121 data_store: Arc<RwLock<dyn DataStore>>,
122 init_complete: Arc<dyn Fn(bool) + Send + Sync>,
123 event_received: EventReceived,
124 shutdown_receiver: broadcast::Receiver<()>,
125 ) {
126 let mut event_stream = self.es_client.stream().fuse();
127
128 tokio::spawn(async move {
129 let shutdown_stream = BroadcastStream::new(shutdown_receiver);
130 let mut shutdown_future = shutdown_stream.into_future();
131 let notify_init = Once::new();
132 let mut init_success = true;
133
134 loop {
135 futures::select! {
136 _ = shutdown_future => break,
137 event = event_stream.next() => {
138 let event = match event {
139 Some(Ok(event)) => {
140 event_received(&event);
141 match event {
142 es::SSE::Connected(_) => {
143 debug!("data source connected");
144 continue;
145 },
146 es::SSE::Comment(str)=> {
147 debug!("data source got a comment: {}", str);
148 continue;
149 },
150 es::SSE::Event(ev) => ev,
151 }
152 },
153 Some(Err(es::Error::UnexpectedResponse(response, _))) => {
154 match is_http_error_recoverable(response.status()) {
155 true => continue,
156 _ => {
157 notify_init.call_once(|| (init_complete)(false));
158 warn!("Returned unrecoverable failure. Unexpected response {}", response.status());
159 break
160 }
161 }
162 },
163 Some(Err(e)) => {
164 warn!("error on event stream: {:?}; assuming event stream will reconnect", e);
165 continue;
166 },
167 None => {
168 error!("unexpected end of event stream; terminating sync task; launchdarkly sync is now broken!");
177 break;
178 }
179 };
180
181 let data_store = data_store.clone();
182 let mut data_store = data_store.write();
183
184 debug!("data source got an event: {}", event.event_type);
185
186 let stored = match event.event_type.as_str() {
187 "put" => process_put(&mut *data_store, event),
188 "patch" => process_patch(&mut *data_store, event),
189 "delete" => process_delete(&mut *data_store, event),
190 _ => Err(Error::InvalidEventType(event.event_type)),
191 };
192 if let Err(e) = stored {
193 init_success = false;
194 error!("error processing update: {:?}", e);
195 }
196
197 notify_init.call_once(|| (init_complete)(init_success));
198 },
199 }
200 }
201 });
202 }
203}
204
205pub struct PollingDataSource {
206 feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>>,
207 poll_interval: Duration,
208 tags: Option<String>,
209}
210
211impl PollingDataSource {
212 pub fn new(
213 feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>>,
214 poll_interval: Duration,
215 tags: Option<String>,
216 ) -> Self {
217 Self {
218 feature_requester_factory,
219 poll_interval,
220 tags,
221 }
222 }
223}
224
225impl DataSource for PollingDataSource {
226 fn subscribe(
227 &self,
228 data_store: Arc<RwLock<dyn DataStore>>,
229 init_complete: Arc<dyn Fn(bool) + Send + Sync>,
230 _event_received: EventReceived,
231 shutdown_receiver: broadcast::Receiver<()>,
232 ) {
233 let mut feature_requester = match self.feature_requester_factory.lock() {
234 Ok(factory) => match factory.build(self.tags.clone()) {
235 Ok(requester) => requester,
236 Err(e) => {
237 error!("{:?}", e);
238 return;
239 }
240 },
241 Err(e) => {
242 error!("{:?}", e);
243 return;
244 }
245 };
246
247 let poll_interval = self.poll_interval;
248 tokio::spawn(async move {
249 let notify_init = Once::new();
250
251 let mut interval = IntervalStream::new(time::interval(poll_interval)).fuse();
252
253 let shutdown_stream = BroadcastStream::new(shutdown_receiver);
254 let mut shutdown_future = shutdown_stream.into_future();
255
256 loop {
257 futures::select! {
258 _ = interval.next() => {
259 match feature_requester.get_all().await {
260 Ok(all_data) => {
261 let mut data_store = data_store.write();
262 data_store.init(all_data);
263 notify_init.call_once(|| init_complete(true));
264 }
265 Err(FeatureRequesterError::Temporary) => {
266 warn!("feature requester has returned a temporary failure");
267 }
268 Err(FeatureRequesterError::Permanent) => {
269 error!("feature requester has returned a permanent failure");
270 notify_init.call_once(|| init_complete(false));
271 break;
272 }
273 };
274 },
275 _ = shutdown_future => break
276 }
277 }
278 });
279 }
280}
281
282pub struct NullDataSource {}
283
284impl NullDataSource {
285 pub fn new() -> Self {
286 Self {}
287 }
288}
289
290impl DataSource for NullDataSource {
291 fn subscribe(
292 &self,
293 _datastore: Arc<RwLock<dyn DataStore>>,
294 _init_complete: Arc<dyn Fn(bool) + Send + Sync>,
295 _event_received: EventReceived,
296 _shutdown_receiver: broadcast::Receiver<()>,
297 ) {
298 }
299}
300
301#[cfg(test)]
302pub(crate) struct MockDataSource {
303 delay_init: u64,
304}
305
306#[cfg(test)]
307impl MockDataSource {
308 pub fn new_with_init_delay(delay_init: u64) -> Self {
309 MockDataSource { delay_init }
310 }
311}
312
313#[cfg(test)]
314impl DataSource for MockDataSource {
315 fn subscribe(
316 &self,
317 _datastore: Arc<RwLock<dyn DataStore>>,
318 init_complete: Arc<dyn Fn(bool) + Send + Sync>,
319 _event_received: EventReceived,
320 _shutdown_receiver: broadcast::Receiver<()>,
321 ) {
322 let delay_init = self.delay_init;
323 if self.delay_init != 0 {
324 tokio::spawn(async move {
325 tokio::time::sleep(Duration::from_millis(delay_init)).await;
326 (init_complete)(true);
327 });
328 } else {
329 (init_complete)(true);
330 }
331 }
332}
333
334fn parse_event_data<'a, T: Deserialize<'a>>(event: &'a es::Event) -> Result<T> {
335 serde_json::from_slice(event.data.as_ref()).map_err(|e| Error::InvalidEventData {
336 event_type: event.event_type.clone(),
337 error: Box::new(e),
338 })
339}
340
341fn process_put(data_store: &mut dyn DataStore, event: es::Event) -> Result<()> {
342 let put: PutData = parse_event_data(&event)?;
343 if put.path == "/" || put.path.is_empty() {
344 data_store.init(put.data);
345 Ok(())
346 } else {
347 Err(Error::InvalidPath(put.path))
348 }
349}
350
351fn process_patch(data_store: &mut dyn DataStore, event: es::Event) -> Result<()> {
352 let patch: PatchData = parse_event_data(&event)?;
353 let (_, key) = path_to_key(&patch.path)?;
354
355 data_store
356 .upsert(key, patch.data)
357 .map_err(Error::InvalidUpdate)
358}
359
360fn process_delete(data_store: &mut dyn DataStore, event: es::Event) -> Result<()> {
361 let delete: DeleteData = parse_event_data(&event)?;
362 let (kind, key) = path_to_key(&delete.path)?;
363 let target = match kind {
364 DataKind::Flag => PatchTarget::Flag(StorageItem::Tombstone(delete.version)),
365 DataKind::Segment => PatchTarget::Segment(StorageItem::Tombstone(delete.version)),
366 };
367
368 data_store.upsert(key, target).map_err(Error::InvalidUpdate)
369}
370
371fn path_to_key(path: &str) -> Result<(DataKind, &str)> {
372 if let Some(flag_key) = path.strip_prefix(FLAGS_PREFIX) {
373 Ok((DataKind::Flag, flag_key))
374 } else if let Some(segment_key) = path.strip_prefix(SEGMENTS_PREFIX) {
375 Ok((DataKind::Segment, segment_key))
376 } else {
377 Err(Error::InvalidPath(path.to_string()))
378 }
379}
380
381#[cfg(test)]
382mod tests {
383 use std::sync::Mutex;
384 use std::{
385 sync::{
386 atomic::{AtomicBool, Ordering},
387 Arc,
388 },
389 time::Duration,
390 };
391
392 use hyper::client::HttpConnector;
393 use mockito::Matcher;
394 use parking_lot::RwLock;
395 use test_case::test_case;
396 use tokio::sync::broadcast;
397
398 use super::{DataSource, PollingDataSource, StreamingDataSource};
399 use crate::feature_requester_builders::HyperFeatureRequesterBuilder;
400 use crate::{stores::store::InMemoryDataStore, LAUNCHDARKLY_TAGS_HEADER};
401
402 #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
403 #[test_case(None, Matcher::Missing)]
404 #[tokio::test(flavor = "multi_thread")]
405 async fn streaming_source_passes_along_tags_header(
406 tag: Option<String>,
407 matcher: impl Into<Matcher>,
408 ) {
409 let mut server = mockito::Server::new_async().await;
410 let mock = server
411 .mock("GET", "/all")
412 .with_status(200)
413 .with_body("event:one\ndata:One\n\n")
414 .expect_at_least(1)
415 .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
416 .create_async()
417 .await;
418
419 let (shutdown_tx, _) = broadcast::channel::<()>(1);
420 let initialized = Arc::new(AtomicBool::new(false));
421
422 let streaming = StreamingDataSource::new(
423 &server.url(),
424 "sdk-key",
425 Duration::from_secs(0),
426 &tag,
427 HttpConnector::new(),
428 )
429 .unwrap();
430
431 let data_store = Arc::new(RwLock::new(InMemoryDataStore::new()));
432
433 let init_state = initialized.clone();
434 streaming.subscribe(
435 data_store,
436 Arc::new(move |success| init_state.store(success, Ordering::SeqCst)),
437 Arc::new(move |_ev| {}),
438 shutdown_tx.subscribe(),
439 );
440
441 let mut attempts = 0;
442 loop {
443 if initialized.load(Ordering::SeqCst) {
444 break;
445 }
446
447 attempts += 1;
448 if attempts > 10 {
449 break;
450 }
451
452 std::thread::sleep(Duration::from_millis(100));
453 }
454
455 let _ = shutdown_tx.send(());
456 mock.assert()
457 }
458
459 #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
460 #[test_case(None, Matcher::Missing)]
461 #[tokio::test(flavor = "multi_thread")]
462 async fn polling_source_passes_along_tags_header(
463 tag: Option<String>,
464 matcher: impl Into<Matcher>,
465 ) {
466 let mut server = mockito::Server::new_async().await;
467 let mock = server
468 .mock("GET", "/sdk/latest-all")
469 .with_status(200)
470 .with_body("{}")
471 .expect_at_least(1)
472 .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
473 .create_async()
474 .await;
475
476 let (shutdown_tx, _) = broadcast::channel::<()>(1);
477 let initialized = Arc::new(AtomicBool::new(false));
478
479 let hyper_builder =
480 HyperFeatureRequesterBuilder::new(&server.url(), "sdk-key", HttpConnector::new());
481
482 let polling = PollingDataSource::new(
483 Arc::new(Mutex::new(Box::new(hyper_builder))),
484 Duration::from_secs(10),
485 tag,
486 );
487
488 let data_store = Arc::new(RwLock::new(InMemoryDataStore::new()));
489
490 let init_state = initialized.clone();
491 polling.subscribe(
492 data_store,
493 Arc::new(move |success| init_state.store(success, Ordering::SeqCst)),
494 Arc::new(move |_ev| {}),
495 shutdown_tx.subscribe(),
496 );
497
498 let mut attempts = 0;
499 loop {
500 if initialized.load(Ordering::SeqCst) {
501 break;
502 }
503
504 attempts += 1;
505 if attempts > 10 {
506 break;
507 }
508
509 std::thread::sleep(Duration::from_millis(100));
510 }
511
512 let _ = shutdown_tx.send(());
513
514 mock.assert()
515 }
516}