launchdarkly_server_sdk/
data_source.rs

1use super::stores::store_types::{AllData, DataKind, PatchTarget, StorageItem};
2use crate::feature_requester::FeatureRequesterError;
3use crate::feature_requester_builders::FeatureRequesterFactory;
4use crate::reqwest::is_http_error_recoverable;
5use crate::stores::store::{DataStore, UpdateError};
6use crate::LAUNCHDARKLY_TAGS_HEADER;
7use es::{Client, ClientBuilder, ReconnectOptionsBuilder};
8use eventsource_client as es;
9use futures::StreamExt;
10use hyper::client::connect::Connection;
11use hyper::service::Service;
12use hyper::Uri;
13use launchdarkly_server_sdk_evaluation::{Flag, Segment};
14use parking_lot::RwLock;
15use serde::Deserialize;
16use std::sync::{Arc, Mutex, Once};
17use std::time::Duration;
18use tokio::io::{AsyncRead, AsyncWrite};
19use tokio::sync::broadcast;
20use tokio::time;
21use tokio_stream::wrappers::{BroadcastStream, IntervalStream};
22
23const FLAGS_PREFIX: &str = "/flags/";
24const SEGMENTS_PREFIX: &str = "/segments/";
25
26#[derive(Debug)]
27#[allow(clippy::enum_variant_names, dead_code)]
28pub enum Error {
29    InvalidEventData {
30        event_type: String,
31        error: Box<dyn std::error::Error + Send>,
32    },
33    InvalidPath(String),
34    InvalidUpdate(UpdateError),
35    InvalidEventType(String),
36}
37
38pub type Result<T> = std::result::Result<T, Error>;
39
40#[derive(Deserialize)]
41pub(crate) struct PutData {
42    #[serde(default = "String::default")]
43    path: String,
44    data: AllData<Flag, Segment>,
45}
46
47#[derive(Deserialize)]
48pub(crate) struct PatchData {
49    pub path: String,
50    pub data: PatchTarget,
51}
52
53#[derive(Deserialize)]
54pub(crate) struct DeleteData {
55    path: String,
56    version: u64,
57}
58
59pub type EventReceived = Arc<dyn Fn(&es::SSE) + Send + Sync>;
60
61/// Trait for the component that obtains feature flag data in some way and passes it to a data
62/// store. The built-in implementations of this are the client's standard streaming or polling
63/// behavior.
64pub trait DataSource: Send + Sync {
65    fn subscribe(
66        &self,
67        data_store: Arc<RwLock<dyn DataStore>>,
68        init_complete: Arc<dyn Fn(bool) + Send + Sync>,
69        event_received: EventReceived,
70        shutdown_receiver: broadcast::Receiver<()>,
71    );
72}
73
74pub struct StreamingDataSource {
75    es_client: Box<dyn Client>,
76}
77
78impl StreamingDataSource {
79    pub fn new<C>(
80        base_url: &str,
81        sdk_key: &str,
82        initial_reconnect_delay: Duration,
83        tags: &Option<String>,
84        connector: C,
85    ) -> std::result::Result<Self, es::Error>
86    where
87        C: Service<Uri> + Clone + Send + Sync + 'static,
88        C::Response: Connection + AsyncRead + AsyncWrite + Send + Unpin,
89        C::Future: Send + 'static,
90        C::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
91    {
92        let stream_url = format!("{}/all", base_url);
93
94        let client_builder = ClientBuilder::for_url(&stream_url)?;
95        let mut client_builder = client_builder
96            .reconnect(
97                ReconnectOptionsBuilder::new(true)
98                    .retry_initial(true)
99                    .delay(initial_reconnect_delay)
100                    .delay_max(Duration::from_secs(30))
101                    .build(),
102            )
103            .connect_timeout(Duration::from_secs(10))
104            .read_timeout(Duration::from_secs(300)) // LaunchDarkly sends keepalives every 3m
105            .header("Authorization", sdk_key)?
106            .header("User-Agent", &crate::USER_AGENT)?;
107
108        if let Some(tags) = tags {
109            client_builder = client_builder.header(LAUNCHDARKLY_TAGS_HEADER, tags)?;
110        }
111
112        Ok(Self {
113            es_client: Box::new(client_builder.build_with_conn(connector)),
114        })
115    }
116}
117
118impl DataSource for StreamingDataSource {
119    fn subscribe(
120        &self,
121        data_store: Arc<RwLock<dyn DataStore>>,
122        init_complete: Arc<dyn Fn(bool) + Send + Sync>,
123        event_received: EventReceived,
124        shutdown_receiver: broadcast::Receiver<()>,
125    ) {
126        let mut event_stream = self.es_client.stream().fuse();
127
128        tokio::spawn(async move {
129            let shutdown_stream = BroadcastStream::new(shutdown_receiver);
130            let mut shutdown_future = shutdown_stream.into_future();
131            let notify_init = Once::new();
132            let mut init_success = true;
133
134            loop {
135                futures::select! {
136                    _ = shutdown_future => break,
137                    event = event_stream.next() => {
138                        let event = match event {
139                            Some(Ok(event)) => {
140                                event_received(&event);
141                                match event {
142                                    es::SSE::Connected(_) => {
143                                        debug!("data source connected");
144                                        continue;
145                                    },
146                                    es::SSE::Comment(str)=> {
147                                        debug!("data source got a comment: {}", str);
148                                        continue;
149                                    },
150                                    es::SSE::Event(ev) => ev,
151                                }
152                            },
153                            Some(Err(es::Error::UnexpectedResponse(response, _))) => {
154                                match is_http_error_recoverable(response.status()) {
155                                    true => continue,
156                                    _ => {
157                                        notify_init.call_once(|| (init_complete)(false));
158                                        warn!("Returned unrecoverable failure. Unexpected response {}", response.status());
159                                        break
160                                    }
161                                }
162                            },
163                            Some(Err(e)) => {
164                                warn!("error on event stream: {:?}; assuming event stream will reconnect", e);
165                                continue;
166                            },
167                            None => {
168                                // NOTE(benesch): At the time of writing, the underlying event
169                                // source client will never return `None`. Something is seriously
170                                // wrong if we get here, so we loudly error.
171                                //
172                                // We don't attempt to retry, though, as the underlying event source
173                                // client already has retry logic, and we don't want to write that
174                                // retry logic twice. Better to fix the bugs in the underlying
175                                // client's retry logic.
176                                error!("unexpected end of event stream; terminating sync task; launchdarkly sync is now broken!");
177                                break;
178                            }
179                        };
180
181                        let data_store = data_store.clone();
182                        let mut data_store = data_store.write();
183
184                        debug!("data source got an event: {}", event.event_type);
185
186                        let stored = match event.event_type.as_str() {
187                            "put" => process_put(&mut *data_store, event),
188                            "patch" => process_patch(&mut *data_store, event),
189                            "delete" => process_delete(&mut *data_store, event),
190                            _ => Err(Error::InvalidEventType(event.event_type)),
191                        };
192                        if let Err(e) = stored {
193                            init_success = false;
194                            error!("error processing update: {:?}", e);
195                        }
196
197                        notify_init.call_once(|| (init_complete)(init_success));
198                    },
199                }
200            }
201        });
202    }
203}
204
205pub struct PollingDataSource {
206    feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>>,
207    poll_interval: Duration,
208    tags: Option<String>,
209}
210
211impl PollingDataSource {
212    pub fn new(
213        feature_requester_factory: Arc<Mutex<Box<dyn FeatureRequesterFactory>>>,
214        poll_interval: Duration,
215        tags: Option<String>,
216    ) -> Self {
217        Self {
218            feature_requester_factory,
219            poll_interval,
220            tags,
221        }
222    }
223}
224
225impl DataSource for PollingDataSource {
226    fn subscribe(
227        &self,
228        data_store: Arc<RwLock<dyn DataStore>>,
229        init_complete: Arc<dyn Fn(bool) + Send + Sync>,
230        _event_received: EventReceived,
231        shutdown_receiver: broadcast::Receiver<()>,
232    ) {
233        let mut feature_requester = match self.feature_requester_factory.lock() {
234            Ok(factory) => match factory.build(self.tags.clone()) {
235                Ok(requester) => requester,
236                Err(e) => {
237                    error!("{:?}", e);
238                    return;
239                }
240            },
241            Err(e) => {
242                error!("{:?}", e);
243                return;
244            }
245        };
246
247        let poll_interval = self.poll_interval;
248        tokio::spawn(async move {
249            let notify_init = Once::new();
250
251            let mut interval = IntervalStream::new(time::interval(poll_interval)).fuse();
252
253            let shutdown_stream = BroadcastStream::new(shutdown_receiver);
254            let mut shutdown_future = shutdown_stream.into_future();
255
256            loop {
257                futures::select! {
258                    _ = interval.next() => {
259                        match feature_requester.get_all().await {
260                            Ok(all_data) => {
261                                let mut data_store = data_store.write();
262                                data_store.init(all_data);
263                                notify_init.call_once(|| init_complete(true));
264                            }
265                            Err(FeatureRequesterError::Temporary) => {
266                                warn!("feature requester has returned a temporary failure");
267                            }
268                            Err(FeatureRequesterError::Permanent) => {
269                                error!("feature requester has returned a permanent failure");
270                                notify_init.call_once(|| init_complete(false));
271                                break;
272                            }
273                        };
274                    },
275                    _ = shutdown_future => break
276                }
277            }
278        });
279    }
280}
281
282pub struct NullDataSource {}
283
284impl NullDataSource {
285    pub fn new() -> Self {
286        Self {}
287    }
288}
289
290impl DataSource for NullDataSource {
291    fn subscribe(
292        &self,
293        _datastore: Arc<RwLock<dyn DataStore>>,
294        _init_complete: Arc<dyn Fn(bool) + Send + Sync>,
295        _event_received: EventReceived,
296        _shutdown_receiver: broadcast::Receiver<()>,
297    ) {
298    }
299}
300
301#[cfg(test)]
302pub(crate) struct MockDataSource {
303    delay_init: u64,
304}
305
306#[cfg(test)]
307impl MockDataSource {
308    pub fn new_with_init_delay(delay_init: u64) -> Self {
309        MockDataSource { delay_init }
310    }
311}
312
313#[cfg(test)]
314impl DataSource for MockDataSource {
315    fn subscribe(
316        &self,
317        _datastore: Arc<RwLock<dyn DataStore>>,
318        init_complete: Arc<dyn Fn(bool) + Send + Sync>,
319        _event_received: EventReceived,
320        _shutdown_receiver: broadcast::Receiver<()>,
321    ) {
322        let delay_init = self.delay_init;
323        if self.delay_init != 0 {
324            tokio::spawn(async move {
325                tokio::time::sleep(Duration::from_millis(delay_init)).await;
326                (init_complete)(true);
327            });
328        } else {
329            (init_complete)(true);
330        }
331    }
332}
333
334fn parse_event_data<'a, T: Deserialize<'a>>(event: &'a es::Event) -> Result<T> {
335    serde_json::from_slice(event.data.as_ref()).map_err(|e| Error::InvalidEventData {
336        event_type: event.event_type.clone(),
337        error: Box::new(e),
338    })
339}
340
341fn process_put(data_store: &mut dyn DataStore, event: es::Event) -> Result<()> {
342    let put: PutData = parse_event_data(&event)?;
343    if put.path == "/" || put.path.is_empty() {
344        data_store.init(put.data);
345        Ok(())
346    } else {
347        Err(Error::InvalidPath(put.path))
348    }
349}
350
351fn process_patch(data_store: &mut dyn DataStore, event: es::Event) -> Result<()> {
352    let patch: PatchData = parse_event_data(&event)?;
353    let (_, key) = path_to_key(&patch.path)?;
354
355    data_store
356        .upsert(key, patch.data)
357        .map_err(Error::InvalidUpdate)
358}
359
360fn process_delete(data_store: &mut dyn DataStore, event: es::Event) -> Result<()> {
361    let delete: DeleteData = parse_event_data(&event)?;
362    let (kind, key) = path_to_key(&delete.path)?;
363    let target = match kind {
364        DataKind::Flag => PatchTarget::Flag(StorageItem::Tombstone(delete.version)),
365        DataKind::Segment => PatchTarget::Segment(StorageItem::Tombstone(delete.version)),
366    };
367
368    data_store.upsert(key, target).map_err(Error::InvalidUpdate)
369}
370
371fn path_to_key(path: &str) -> Result<(DataKind, &str)> {
372    if let Some(flag_key) = path.strip_prefix(FLAGS_PREFIX) {
373        Ok((DataKind::Flag, flag_key))
374    } else if let Some(segment_key) = path.strip_prefix(SEGMENTS_PREFIX) {
375        Ok((DataKind::Segment, segment_key))
376    } else {
377        Err(Error::InvalidPath(path.to_string()))
378    }
379}
380
381#[cfg(test)]
382mod tests {
383    use std::sync::Mutex;
384    use std::{
385        sync::{
386            atomic::{AtomicBool, Ordering},
387            Arc,
388        },
389        time::Duration,
390    };
391
392    use hyper::client::HttpConnector;
393    use mockito::Matcher;
394    use parking_lot::RwLock;
395    use test_case::test_case;
396    use tokio::sync::broadcast;
397
398    use super::{DataSource, PollingDataSource, StreamingDataSource};
399    use crate::feature_requester_builders::HyperFeatureRequesterBuilder;
400    use crate::{stores::store::InMemoryDataStore, LAUNCHDARKLY_TAGS_HEADER};
401
402    #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
403    #[test_case(None, Matcher::Missing)]
404    #[tokio::test(flavor = "multi_thread")]
405    async fn streaming_source_passes_along_tags_header(
406        tag: Option<String>,
407        matcher: impl Into<Matcher>,
408    ) {
409        let mut server = mockito::Server::new_async().await;
410        let mock = server
411            .mock("GET", "/all")
412            .with_status(200)
413            .with_body("event:one\ndata:One\n\n")
414            .expect_at_least(1)
415            .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
416            .create_async()
417            .await;
418
419        let (shutdown_tx, _) = broadcast::channel::<()>(1);
420        let initialized = Arc::new(AtomicBool::new(false));
421
422        let streaming = StreamingDataSource::new(
423            &server.url(),
424            "sdk-key",
425            Duration::from_secs(0),
426            &tag,
427            HttpConnector::new(),
428        )
429        .unwrap();
430
431        let data_store = Arc::new(RwLock::new(InMemoryDataStore::new()));
432
433        let init_state = initialized.clone();
434        streaming.subscribe(
435            data_store,
436            Arc::new(move |success| init_state.store(success, Ordering::SeqCst)),
437            Arc::new(move |_ev| {}),
438            shutdown_tx.subscribe(),
439        );
440
441        let mut attempts = 0;
442        loop {
443            if initialized.load(Ordering::SeqCst) {
444                break;
445            }
446
447            attempts += 1;
448            if attempts > 10 {
449                break;
450            }
451
452            std::thread::sleep(Duration::from_millis(100));
453        }
454
455        let _ = shutdown_tx.send(());
456        mock.assert()
457    }
458
459    #[test_case(Some("application-id/abc:application-sha/xyz".into()), "application-id/abc:application-sha/xyz")]
460    #[test_case(None, Matcher::Missing)]
461    #[tokio::test(flavor = "multi_thread")]
462    async fn polling_source_passes_along_tags_header(
463        tag: Option<String>,
464        matcher: impl Into<Matcher>,
465    ) {
466        let mut server = mockito::Server::new_async().await;
467        let mock = server
468            .mock("GET", "/sdk/latest-all")
469            .with_status(200)
470            .with_body("{}")
471            .expect_at_least(1)
472            .match_header(LAUNCHDARKLY_TAGS_HEADER, matcher)
473            .create_async()
474            .await;
475
476        let (shutdown_tx, _) = broadcast::channel::<()>(1);
477        let initialized = Arc::new(AtomicBool::new(false));
478
479        let hyper_builder =
480            HyperFeatureRequesterBuilder::new(&server.url(), "sdk-key", HttpConnector::new());
481
482        let polling = PollingDataSource::new(
483            Arc::new(Mutex::new(Box::new(hyper_builder))),
484            Duration::from_secs(10),
485            tag,
486        );
487
488        let data_store = Arc::new(RwLock::new(InMemoryDataStore::new()));
489
490        let init_state = initialized.clone();
491        polling.subscribe(
492            data_store,
493            Arc::new(move |success| init_state.store(success, Ordering::SeqCst)),
494            Arc::new(move |_ev| {}),
495            shutdown_tx.subscribe(),
496        );
497
498        let mut attempts = 0;
499        loop {
500            if initialized.load(Ordering::SeqCst) {
501                break;
502            }
503
504            attempts += 1;
505            if attempts > 10 {
506                break;
507            }
508
509            std::thread::sleep(Duration::from_millis(100));
510        }
511
512        let _ = shutdown_tx.send(());
513
514        mock.assert()
515    }
516}