1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::{DataSource, EventReceived};
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28 disabled: bool,
29 event_factory: EventFactory,
30 prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34 event_factory: EventFactory,
35 event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39 fn record(&self, event: PrerequisiteEvent) {
40 let evt = self.event_factory.new_eval_event(
41 &event.prerequisite_flag.key,
42 event.context.clone(),
43 &event.prerequisite_flag,
44 event.prerequisite_result,
45 FlagValue::Json(serde_json::Value::Null),
46 Some(event.target_flag_key),
47 );
48
49 self.event_processor.send(evt);
50 }
51}
52
53#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57 #[error("invalid client config: {0}")]
59 InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63 fn from(error: DataSourceError) -> Self {
64 Self::InvalidConfig(error.to_string())
65 }
66}
67
68impl From<DataStoreError> for BuildError {
69 fn from(error: DataStoreError) -> Self {
70 Self::InvalidConfig(error.to_string())
71 }
72}
73
74impl From<EventProcessorError> for BuildError {
75 fn from(error: EventProcessorError) -> Self {
76 Self::InvalidConfig(error.to_string())
77 }
78}
79
80impl From<ConfigBuildError> for BuildError {
81 fn from(error: ConfigBuildError) -> Self {
82 Self::InvalidConfig(error.to_string())
83 }
84}
85
86#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90 #[error("couldn't spawn background thread for client: {0}")]
92 SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97 Initializing = 0,
98 Initialized = 1,
99 InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103 fn eq(&self, other: &usize) -> bool {
104 *self as usize == *other
105 }
106}
107
108impl From<usize> for ClientInitState {
109 fn from(val: usize) -> Self {
110 match val {
111 0 => ClientInitState::Initializing,
112 1 => ClientInitState::Initialized,
113 2 => ClientInitState::InitializationFailed,
114 _ => unreachable!(),
115 }
116 }
117}
118
119pub struct Client {
149 event_processor: Arc<dyn EventProcessor>,
150 data_source: Arc<dyn DataSource>,
151 data_store: Arc<RwLock<dyn DataStore>>,
152 events_default: EventsScope,
153 events_with_reasons: EventsScope,
154 init_notify: Arc<Semaphore>,
155 init_state: Arc<AtomicUsize>,
156 started: AtomicBool,
157 offline: bool,
158 daemon_mode: bool,
159 sdk_key: String,
160 shutdown_broadcast: broadcast::Sender<()>,
161 runtime: RwLock<Option<Runtime>>,
162}
163
164impl Client {
165 pub fn build(config: Config) -> Result<Self, BuildError> {
167 if config.offline() {
168 info!("Started LaunchDarkly Client in offline mode");
169 } else if config.daemon_mode() {
170 info!("Started LaunchDarkly Client in daemon mode");
171 }
172
173 let tags = config.application_tag();
174
175 let endpoints = config.service_endpoints_builder().build()?;
176 let event_processor =
177 config
178 .event_processor_builder()
179 .build(&endpoints, config.sdk_key(), tags.clone())?;
180 let data_source =
181 config
182 .data_source_builder()
183 .build(&endpoints, config.sdk_key(), tags.clone())?;
184 let data_store = config.data_store_builder().build()?;
185
186 let events_default = EventsScope {
187 disabled: config.offline(),
188 event_factory: EventFactory::new(false),
189 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
190 event_factory: EventFactory::new(false),
191 event_processor: event_processor.clone(),
192 }),
193 };
194
195 let events_with_reasons = EventsScope {
196 disabled: config.offline(),
197 event_factory: EventFactory::new(true),
198 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
199 event_factory: EventFactory::new(true),
200 event_processor: event_processor.clone(),
201 }),
202 };
203
204 let (shutdown_tx, _) = broadcast::channel(1);
205
206 Ok(Client {
207 event_processor,
208 data_source,
209 data_store,
210 events_default,
211 events_with_reasons,
212 init_notify: Arc::new(Semaphore::new(0)),
213 init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
214 started: AtomicBool::new(false),
215 offline: config.offline(),
216 daemon_mode: config.daemon_mode(),
217 sdk_key: config.sdk_key().into(),
218 shutdown_broadcast: shutdown_tx,
219 runtime: RwLock::new(None),
220 })
221 }
222
223 pub fn start_with_default_executor_and_callback(&self, event_received: EventReceived) {
227 if self.started.load(Ordering::SeqCst) {
228 return;
229 }
230 self.started.store(true, Ordering::SeqCst);
231 self.start_with_default_executor_internal(event_received);
232 }
233
234 pub fn start_with_default_executor(&self) {
236 if self.started.load(Ordering::SeqCst) {
237 return;
238 }
239 self.started.store(true, Ordering::SeqCst);
240 self.start_with_default_executor_internal(Arc::new(move |_ev| {}));
241 }
242
243 fn start_with_default_executor_internal(&self, event_received: EventReceived) {
244 let notify = self.init_notify.clone();
248 let init_state = self.init_state.clone();
249
250 self.data_source.subscribe(
251 self.data_store.clone(),
252 Arc::new(move |success| {
253 init_state.store(
254 (if success {
255 ClientInitState::Initialized
256 } else {
257 ClientInitState::InitializationFailed
258 }) as usize,
259 Ordering::SeqCst,
260 );
261 notify.add_permits(1);
262 }),
263 event_received,
264 self.shutdown_broadcast.subscribe(),
265 );
266 }
267
268 pub fn start_with_runtime(&self) -> Result<bool, StartError> {
274 if self.started.load(Ordering::SeqCst) {
275 return Ok(true);
276 }
277 self.started.store(true, Ordering::SeqCst);
278
279 let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
280 let _guard = runtime.enter();
281 self.runtime.write().replace(runtime);
282
283 self.start_with_default_executor_internal(Arc::new(move |_ev| {}));
284
285 Ok(true)
286 }
287
288 #[deprecated(
292 note = "blocking without a timeout is discouraged, use wait_for_initialization instead"
293 )]
294 pub async fn initialized_async(&self) -> bool {
295 self.initialized_async_internal().await
296 }
297
298 pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
304 if timeout > Duration::from_secs(60) {
305 warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
306 }
307
308 let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
309 match initialized {
310 Ok(result) => Some(result),
311 Err(_) => None,
312 }
313 }
314
315 async fn initialized_async_internal(&self) -> bool {
316 if self.offline || self.daemon_mode {
317 return true;
318 }
319
320 if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
326 let _permit = self.init_notify.acquire().await;
327 }
328 ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
329 }
330
331 pub fn initialized(&self) -> bool {
335 self.offline
336 || self.daemon_mode
337 || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
338 }
339
340 pub fn close(&self) {
344 self.event_processor.close();
345
346 if !self.offline && !self.daemon_mode {
349 if let Err(e) = self.shutdown_broadcast.send(()) {
350 error!("Failed to shutdown client appropriately: {}", e);
351 }
352 }
353
354 self.runtime.write().take();
357 }
358
359 pub fn flush(&self) {
367 self.event_processor.flush();
368 }
369
370 pub fn identify(&self, context: Context) {
375 if self.events_default.disabled {
376 return;
377 }
378
379 self.send_internal(self.events_default.event_factory.new_identify(context));
380 }
381
382 pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
390 let val = self.variation(context, flag_key, default);
391 if let Some(b) = val.as_bool() {
392 b
393 } else {
394 warn!(
395 "bool_variation called for a non-bool flag {:?} (got {:?})",
396 flag_key, val
397 );
398 default
399 }
400 }
401
402 pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
410 let val = self.variation(context, flag_key, default.clone());
411 if let Some(s) = val.as_string() {
412 s
413 } else {
414 warn!(
415 "str_variation called for a non-string flag {:?} (got {:?})",
416 flag_key, val
417 );
418 default
419 }
420 }
421
422 pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
430 let val = self.variation(context, flag_key, default);
431 if let Some(f) = val.as_float() {
432 f
433 } else {
434 warn!(
435 "float_variation called for a non-float flag {:?} (got {:?})",
436 flag_key, val
437 );
438 default
439 }
440 }
441
442 pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
450 let val = self.variation(context, flag_key, default);
451 if let Some(f) = val.as_int() {
452 f
453 } else {
454 warn!(
455 "int_variation called for a non-int flag {:?} (got {:?})",
456 flag_key, val
457 );
458 default
459 }
460 }
461
462 pub fn json_variation(
472 &self,
473 context: &Context,
474 flag_key: &str,
475 default: serde_json::Value,
476 ) -> serde_json::Value {
477 self.variation(context, flag_key, default.clone())
478 .as_json()
479 .unwrap_or(default)
480 }
481
482 pub fn bool_variation_detail(
489 &self,
490 context: &Context,
491 flag_key: &str,
492 default: bool,
493 ) -> Detail<bool> {
494 self.variation_detail(context, flag_key, default).try_map(
495 |val| val.as_bool(),
496 default,
497 eval::Error::WrongType,
498 )
499 }
500
501 pub fn str_variation_detail(
508 &self,
509 context: &Context,
510 flag_key: &str,
511 default: String,
512 ) -> Detail<String> {
513 self.variation_detail(context, flag_key, default.clone())
514 .try_map(|val| val.as_string(), default, eval::Error::WrongType)
515 }
516
517 pub fn float_variation_detail(
524 &self,
525 context: &Context,
526 flag_key: &str,
527 default: f64,
528 ) -> Detail<f64> {
529 self.variation_detail(context, flag_key, default).try_map(
530 |val| val.as_float(),
531 default,
532 eval::Error::WrongType,
533 )
534 }
535
536 pub fn int_variation_detail(
543 &self,
544 context: &Context,
545 flag_key: &str,
546 default: i64,
547 ) -> Detail<i64> {
548 self.variation_detail(context, flag_key, default).try_map(
549 |val| val.as_int(),
550 default,
551 eval::Error::WrongType,
552 )
553 }
554
555 pub fn json_variation_detail(
562 &self,
563 context: &Context,
564 flag_key: &str,
565 default: serde_json::Value,
566 ) -> Detail<serde_json::Value> {
567 self.variation_detail(context, flag_key, default.clone())
568 .try_map(|val| val.as_json(), default, eval::Error::WrongType)
569 }
570
571 pub fn secure_mode_hash(&self, context: &Context) -> String {
576 let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
577 let tag = ring::hmac::sign(&key, context.canonical_key().as_bytes());
578
579 data_encoding::HEXLOWER.encode(tag.as_ref())
580 }
581
582 pub fn all_flags_detail(
593 &self,
594 context: &Context,
595 flag_state_config: FlagDetailConfig,
596 ) -> FlagDetail {
597 if self.offline {
598 warn!(
599 "all_flags_detail() called, but client is in offline mode. Returning empty state"
600 );
601 return FlagDetail::new(false);
602 }
603
604 if !self.initialized() {
605 warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
606 return FlagDetail::new(false);
607 }
608
609 let data_store = self.data_store.read();
610
611 let mut flag_detail = FlagDetail::new(true);
612 flag_detail.populate(&*data_store, context, flag_state_config);
613
614 flag_detail
615 }
616
617 pub fn variation_detail<T: Into<FlagValue> + Clone>(
623 &self,
624 context: &Context,
625 flag_key: &str,
626 default: T,
627 ) -> Detail<FlagValue> {
628 let (detail, _) =
629 self.variation_internal(context, flag_key, default, &self.events_with_reasons);
630 detail
631 }
632
633 pub fn variation<T: Into<FlagValue> + Clone>(
644 &self,
645 context: &Context,
646 flag_key: &str,
647 default: T,
648 ) -> FlagValue {
649 let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
650 detail.value.unwrap()
651 }
652
653 pub fn migration_variation(
658 &self,
659 context: &Context,
660 flag_key: &str,
661 default_stage: Stage,
662 ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
663 let (detail, flag) =
664 self.variation_internal(context, flag_key, default_stage, &self.events_default);
665
666 let migration_detail =
667 detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
668 let tracker = MigrationOpTracker::new(
669 flag_key.into(),
670 flag,
671 context.clone(),
672 migration_detail.clone(),
673 default_stage,
674 );
675
676 (
677 migration_detail.value.unwrap_or(default_stage),
678 Arc::new(Mutex::new(tracker)),
679 )
680 }
681
682 pub fn track_event(&self, context: Context, key: impl Into<String>) {
692 let _ = self.track(context, key, None, serde_json::Value::Null);
693 }
694
695 pub fn track_data(
708 &self,
709 context: Context,
710 key: impl Into<String>,
711 data: impl Serialize,
712 ) -> serde_json::Result<()> {
713 self.track(context, key, None, data)
714 }
715
716 pub fn track_metric(
727 &self,
728 context: Context,
729 key: impl Into<String>,
730 value: f64,
731 data: impl Serialize,
732 ) {
733 let _ = self.track(context, key, Some(value), data);
734 }
735
736 fn track(
737 &self,
738 context: Context,
739 key: impl Into<String>,
740 metric_value: Option<f64>,
741 data: impl Serialize,
742 ) -> serde_json::Result<()> {
743 if !self.events_default.disabled {
744 let event =
745 self.events_default
746 .event_factory
747 .new_custom(context, key, metric_value, data)?;
748
749 self.send_internal(event);
750 }
751
752 Ok(())
753 }
754
755 pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
762 if self.events_default.disabled {
763 return;
764 }
765
766 match tracker.lock() {
767 Ok(tracker) => {
768 let event = tracker.build();
769 match event {
770 Ok(event) => {
771 self.send_internal(
772 self.events_default.event_factory.new_migration_op(event),
773 );
774 }
775 Err(e) => error!(
776 "Failed to build migration event, no event will be sent: {}",
777 e
778 ),
779 }
780 }
781 Err(e) => error!(
782 "Failed to lock migration tracker, no event will be sent: {}",
783 e
784 ),
785 }
786 }
787
788 fn variation_internal<T: Into<FlagValue> + Clone>(
789 &self,
790 context: &Context,
791 flag_key: &str,
792 default: T,
793 events_scope: &EventsScope,
794 ) -> (Detail<FlagValue>, Option<eval::Flag>) {
795 if self.offline {
796 return (
797 Detail::err_default(eval::Error::ClientNotReady, default.into()),
798 None,
799 );
800 }
801
802 let (flag, result) = match self.initialized() {
803 false => (
804 None,
805 Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
806 ),
807 true => {
808 let data_store = self.data_store.read();
809 match data_store.flag(flag_key) {
810 Some(flag) => {
811 let result = eval::evaluate(
812 data_store.to_store(),
813 &flag,
814 context,
815 Some(&*events_scope.prerequisite_event_recorder),
816 )
817 .map(|v| v.clone())
818 .or(default.clone().into());
819
820 (Some(flag), result)
821 }
822 None => (
823 None,
824 Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
825 ),
826 }
827 }
828 };
829
830 if !events_scope.disabled {
831 let event = match &flag {
832 Some(f) => events_scope.event_factory.new_eval_event(
833 flag_key,
834 context.clone(),
835 f,
836 result.clone(),
837 default.into(),
838 None,
839 ),
840 None => events_scope.event_factory.new_unknown_flag_event(
841 flag_key,
842 context.clone(),
843 result.clone(),
844 default.into(),
845 ),
846 };
847 self.send_internal(event);
848 }
849
850 (result, flag)
851 }
852
853 fn send_internal(&self, event: InputEvent) {
854 self.event_processor.send(event);
855 }
856}
857
858#[cfg(test)]
859mod tests {
860 use assert_json_diff::assert_json_eq;
861 use crossbeam_channel::Receiver;
862 use eval::{ContextBuilder, MultiContextBuilder};
863 use futures::FutureExt;
864 use hyper::client::HttpConnector;
865 use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
866 use maplit::hashmap;
867 use std::collections::HashMap;
868 use tokio::time::Instant;
869
870 use crate::data_source::MockDataSource;
871 use crate::data_source_builders::MockDataSourceBuilder;
872 use crate::events::create_event_sender;
873 use crate::events::event::{OutputEvent, VariationKey};
874 use crate::events::processor_builders::EventProcessorBuilder;
875 use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
876 use crate::stores::store_types::{PatchTarget, StorageItem};
877 use crate::test_common::{
878 self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
879 basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
880 };
881 use crate::{
882 AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
883 PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
884 SerializedItem,
885 };
886 use test_case::test_case;
887
888 use super::*;
889
890 fn is_send_and_sync<T: Send + Sync>() {}
891
892 #[test]
893 fn ensure_client_is_send_and_sync() {
894 is_send_and_sync::<Client>()
895 }
896
897 #[tokio::test]
898 async fn client_asynchronously_initializes() {
899 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
900 client.start_with_default_executor();
901
902 let now = Instant::now();
903 let initialized = client.initialized_async().await;
904 let elapsed_time = now.elapsed();
905 assert!(initialized);
906 assert!(elapsed_time.as_millis() > 500)
908 }
909
910 #[tokio::test]
911 async fn client_asynchronously_initializes_within_timeout() {
912 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
913 client.start_with_default_executor();
914
915 let now = Instant::now();
916 let initialized = client
917 .wait_for_initialization(Duration::from_millis(1500))
918 .await;
919 let elapsed_time = now.elapsed();
920 assert!(elapsed_time.as_millis() > 500);
922 assert_eq!(initialized, Some(true));
923 }
924
925 #[tokio::test]
926 async fn client_asynchronously_initializes_slower_than_timeout() {
927 let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
928 client.start_with_default_executor();
929
930 let now = Instant::now();
931 let initialized = client
932 .wait_for_initialization(Duration::from_millis(500))
933 .await;
934 let elapsed_time = now.elapsed();
935 assert!(elapsed_time.as_millis() < 750);
937 assert!(initialized.is_none());
938 }
939
940 #[tokio::test]
941 async fn client_initializes_immediately_in_offline_mode() {
942 let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
943 client.start_with_default_executor();
944
945 assert!(client.initialized());
946
947 let now = Instant::now();
948 let initialized = client
949 .wait_for_initialization(Duration::from_millis(2000))
950 .await;
951 let elapsed_time = now.elapsed();
952 assert_eq!(initialized, Some(true));
953 assert!(elapsed_time.as_millis() < 500)
954 }
955
956 #[tokio::test]
957 async fn client_initializes_immediately_in_daemon_mode() {
958 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
959 client.start_with_default_executor();
960
961 assert!(client.initialized());
962
963 let now = Instant::now();
964 let initialized = client
965 .wait_for_initialization(Duration::from_millis(2000))
966 .await;
967 let elapsed_time = now.elapsed();
968 assert_eq!(initialized, Some(true));
969 assert!(elapsed_time.as_millis() < 500)
970 }
971
972 #[test_case(basic_flag("myFlag"), false.into(), true.into())]
973 #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
974 fn client_updates_changes_evaluation_results(
975 flag: eval::Flag,
976 default: FlagValue,
977 expected: FlagValue,
978 ) {
979 let context = ContextBuilder::new("foo")
980 .build()
981 .expect("Failed to create context");
982
983 let (client, _event_rx) = make_mocked_client();
984
985 let result = client.variation_detail(&context, "myFlag", default.clone());
986 assert_eq!(result.value.unwrap(), default);
987
988 client.start_with_default_executor();
989 client
990 .data_store
991 .write()
992 .upsert(
993 &flag.key,
994 PatchTarget::Flag(StorageItem::Item(flag.clone())),
995 )
996 .expect("patch should apply");
997
998 let result = client.variation_detail(&context, "myFlag", default);
999 assert_eq!(result.value.unwrap(), expected);
1000 assert!(matches!(
1001 result.reason,
1002 Reason::Fallthrough {
1003 in_experiment: false
1004 }
1005 ));
1006 }
1007
1008 #[test]
1009 fn all_flags_detail_is_invalid_when_offline() {
1010 let (client, _event_rx) = make_mocked_offline_client();
1011 client.start_with_default_executor();
1012
1013 let context = ContextBuilder::new("bob")
1014 .build()
1015 .expect("Failed to create context");
1016
1017 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1018 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1019 }
1020
1021 #[test]
1022 fn all_flags_detail_is_invalid_when_not_initialized() {
1023 let (client, _event_rx) = make_mocked_client();
1024
1025 let context = ContextBuilder::new("bob")
1026 .build()
1027 .expect("Failed to create context");
1028
1029 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1030 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1031 }
1032
1033 #[test]
1034 fn all_flags_detail_returns_flag_states() {
1035 let (client, _event_rx) = make_mocked_client();
1036 client.start_with_default_executor();
1037 client
1038 .data_store
1039 .write()
1040 .upsert(
1041 "myFlag1",
1042 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag1"))),
1043 )
1044 .expect("patch should apply");
1045 client
1046 .data_store
1047 .write()
1048 .upsert(
1049 "myFlag2",
1050 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag2"))),
1051 )
1052 .expect("patch should apply");
1053 let context = ContextBuilder::new("bob")
1054 .build()
1055 .expect("Failed to create context");
1056
1057 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1058
1059 client.close();
1060
1061 assert_json_eq!(
1062 all_flags,
1063 json!({
1064 "myFlag1": true,
1065 "myFlag2": true,
1066 "$flagsState": {
1067 "myFlag1": {
1068 "version": 42,
1069 "variation": 1
1070 },
1071 "myFlag2": {
1072 "version": 42,
1073 "variation": 1
1074 },
1075 },
1076 "$valid": true
1077 })
1078 );
1079 }
1080
1081 #[test]
1082 fn all_flags_detail_returns_prerequisite_relations() {
1083 let (client, _event_rx) = make_mocked_client();
1084 client.start_with_default_executor();
1085 client
1086 .data_store
1087 .write()
1088 .upsert(
1089 "prereq1",
1090 PatchTarget::Flag(StorageItem::Item(basic_flag("prereq1"))),
1091 )
1092 .expect("patch should apply");
1093 client
1094 .data_store
1095 .write()
1096 .upsert(
1097 "prereq2",
1098 PatchTarget::Flag(StorageItem::Item(basic_flag("prereq2"))),
1099 )
1100 .expect("patch should apply");
1101
1102 client
1103 .data_store
1104 .write()
1105 .upsert(
1106 "toplevel",
1107 PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1108 "toplevel",
1109 &["prereq1", "prereq2"],
1110 false,
1111 ))),
1112 )
1113 .expect("patch should apply");
1114
1115 let context = ContextBuilder::new("bob")
1116 .build()
1117 .expect("Failed to create context");
1118
1119 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1120
1121 client.close();
1122
1123 assert_json_eq!(
1124 all_flags,
1125 json!({
1126 "prereq1": true,
1127 "prereq2": true,
1128 "toplevel": true,
1129 "$flagsState": {
1130 "toplevel": {
1131 "version": 42,
1132 "variation": 1,
1133 "prerequisites": ["prereq1", "prereq2"]
1134 },
1135 "prereq1": {
1136 "version": 42,
1137 "variation": 1
1138 },
1139 "prereq2": {
1140 "version": 42,
1141 "variation": 1
1142 },
1143 },
1144 "$valid": true
1145 })
1146 );
1147 }
1148
1149 #[test]
1150 fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1151 let (client, _event_rx) = make_mocked_client();
1152 client.start_with_default_executor();
1153 client
1154 .data_store
1155 .write()
1156 .upsert(
1157 "prereq1",
1158 PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1159 "prereq1", false,
1160 ))),
1161 )
1162 .expect("patch should apply");
1163 client
1164 .data_store
1165 .write()
1166 .upsert(
1167 "prereq2",
1168 PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1169 "prereq2", false,
1170 ))),
1171 )
1172 .expect("patch should apply");
1173
1174 client
1175 .data_store
1176 .write()
1177 .upsert(
1178 "toplevel",
1179 PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1180 "toplevel",
1181 &["prereq1", "prereq2"],
1182 true,
1183 ))),
1184 )
1185 .expect("patch should apply");
1186
1187 let context = ContextBuilder::new("bob")
1188 .build()
1189 .expect("Failed to create context");
1190
1191 let mut config = FlagDetailConfig::new();
1192 config.client_side_only();
1193
1194 let all_flags = client.all_flags_detail(&context, config);
1195
1196 client.close();
1197
1198 assert_json_eq!(
1199 all_flags,
1200 json!({
1201 "toplevel": true,
1202 "$flagsState": {
1203 "toplevel": {
1204 "version": 42,
1205 "variation": 1,
1206 "prerequisites": ["prereq1", "prereq2"]
1207 },
1208 },
1209 "$valid": true
1210 })
1211 );
1212 }
1213
1214 #[test]
1215 fn variation_tracks_events_correctly() {
1216 let (client, event_rx) = make_mocked_client();
1217 client.start_with_default_executor();
1218 client
1219 .data_store
1220 .write()
1221 .upsert(
1222 "myFlag",
1223 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1224 )
1225 .expect("patch should apply");
1226 let context = ContextBuilder::new("bob")
1227 .build()
1228 .expect("Failed to create context");
1229
1230 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1231
1232 assert!(flag_value.as_bool().unwrap());
1233 client.flush();
1234 client.close();
1235
1236 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1237 assert_eq!(events.len(), 2);
1238 assert_eq!(events[0].kind(), "index");
1239 assert_eq!(events[1].kind(), "summary");
1240
1241 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1242 let variation_key = VariationKey {
1243 version: Some(42),
1244 variation: Some(1),
1245 };
1246 let feature = event_summary.features.get("myFlag");
1247 assert!(feature.is_some());
1248
1249 let feature = feature.unwrap();
1250 assert!(feature.counters.contains_key(&variation_key));
1251 } else {
1252 panic!("Event should be a summary type");
1253 }
1254 }
1255
1256 #[test]
1257 fn variation_handles_offline_mode() {
1258 let (client, event_rx) = make_mocked_offline_client();
1259 client.start_with_default_executor();
1260
1261 let context = ContextBuilder::new("bob")
1262 .build()
1263 .expect("Failed to create context");
1264 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1265
1266 assert!(!flag_value.as_bool().unwrap());
1267 client.flush();
1268 client.close();
1269
1270 assert_eq!(event_rx.iter().count(), 0);
1271 }
1272
1273 #[test]
1274 fn variation_handles_unknown_flags() {
1275 let (client, event_rx) = make_mocked_client();
1276 client.start_with_default_executor();
1277 let context = ContextBuilder::new("bob")
1278 .build()
1279 .expect("Failed to create context");
1280
1281 let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1282
1283 assert!(!flag_value.as_bool().unwrap());
1284 client.flush();
1285 client.close();
1286
1287 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1288 assert_eq!(events.len(), 2);
1289 assert_eq!(events[0].kind(), "index");
1290 assert_eq!(events[1].kind(), "summary");
1291
1292 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1293 let variation_key = VariationKey {
1294 version: None,
1295 variation: None,
1296 };
1297
1298 let feature = event_summary.features.get("non-existent-flag");
1299 assert!(feature.is_some());
1300
1301 let feature = feature.unwrap();
1302 assert!(feature.counters.contains_key(&variation_key));
1303 } else {
1304 panic!("Event should be a summary type");
1305 }
1306 }
1307
1308 #[test]
1309 fn variation_detail_handles_debug_events_correctly() {
1310 let (client, event_rx) = make_mocked_client();
1311 client.start_with_default_executor();
1312
1313 let mut flag = basic_flag("myFlag");
1314 flag.debug_events_until_date = Some(64_060_606_800_000); client
1317 .data_store
1318 .write()
1319 .upsert(
1320 &flag.key,
1321 PatchTarget::Flag(StorageItem::Item(flag.clone())),
1322 )
1323 .expect("patch should apply");
1324 let context = ContextBuilder::new("bob")
1325 .build()
1326 .expect("Failed to create context");
1327
1328 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1329
1330 assert!(detail.value.unwrap().as_bool().unwrap());
1331 assert!(matches!(
1332 detail.reason,
1333 Reason::Fallthrough {
1334 in_experiment: false
1335 }
1336 ));
1337 client.flush();
1338 client.close();
1339
1340 let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1341 assert_eq!(events.len(), 3);
1342 assert_eq!(events[0].kind(), "index");
1343 assert_eq!(events[1].kind(), "debug");
1344 assert_eq!(events[2].kind(), "summary");
1345
1346 if let OutputEvent::Summary(event_summary) = events[2].clone() {
1347 let variation_key = VariationKey {
1348 version: Some(42),
1349 variation: Some(1),
1350 };
1351
1352 let feature = event_summary.features.get("myFlag");
1353 assert!(feature.is_some());
1354
1355 let feature = feature.unwrap();
1356 assert!(feature.counters.contains_key(&variation_key));
1357 } else {
1358 panic!("Event should be a summary type");
1359 }
1360 }
1361
1362 #[test]
1363 fn variation_detail_tracks_events_correctly() {
1364 let (client, event_rx) = make_mocked_client();
1365 client.start_with_default_executor();
1366
1367 client
1368 .data_store
1369 .write()
1370 .upsert(
1371 "myFlag",
1372 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1373 )
1374 .expect("patch should apply");
1375 let context = ContextBuilder::new("bob")
1376 .build()
1377 .expect("Failed to create context");
1378
1379 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1380
1381 assert!(detail.value.unwrap().as_bool().unwrap());
1382 assert!(matches!(
1383 detail.reason,
1384 Reason::Fallthrough {
1385 in_experiment: false
1386 }
1387 ));
1388 client.flush();
1389 client.close();
1390
1391 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1392 assert_eq!(events.len(), 2);
1393 assert_eq!(events[0].kind(), "index");
1394 assert_eq!(events[1].kind(), "summary");
1395
1396 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1397 let variation_key = VariationKey {
1398 version: Some(42),
1399 variation: Some(1),
1400 };
1401
1402 let feature = event_summary.features.get("myFlag");
1403 assert!(feature.is_some());
1404
1405 let feature = feature.unwrap();
1406 assert!(feature.counters.contains_key(&variation_key));
1407 } else {
1408 panic!("Event should be a summary type");
1409 }
1410 }
1411
1412 #[test]
1413 fn variation_detail_handles_offline_mode() {
1414 let (client, event_rx) = make_mocked_offline_client();
1415 client.start_with_default_executor();
1416
1417 let context = ContextBuilder::new("bob")
1418 .build()
1419 .expect("Failed to create context");
1420
1421 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1422
1423 assert!(!detail.value.unwrap().as_bool().unwrap());
1424 assert!(matches!(
1425 detail.reason,
1426 Reason::Error {
1427 error: eval::Error::ClientNotReady
1428 }
1429 ));
1430 client.flush();
1431 client.close();
1432
1433 assert_eq!(event_rx.iter().count(), 0);
1434 }
1435
1436 struct InMemoryPersistentDataStoreFactory {
1437 data: AllData<Flag, Segment>,
1438 initialized: bool,
1439 }
1440
1441 impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1442 fn create_persistent_data_store(
1443 &self,
1444 ) -> Result<Box<(dyn PersistentDataStore + 'static)>, std::io::Error> {
1445 let serialized_data =
1446 AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1447 Ok(Box::new(InMemoryPersistentDataStore {
1448 data: serialized_data,
1449 initialized: self.initialized,
1450 }))
1451 }
1452 }
1453
1454 #[test]
1455 fn variation_detail_handles_daemon_mode() {
1456 testing_logger::setup();
1457 let factory = InMemoryPersistentDataStoreFactory {
1458 data: AllData {
1459 flags: hashmap!["flag".into() => basic_flag("flag")],
1460 segments: HashMap::new(),
1461 },
1462 initialized: true,
1463 };
1464 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1465
1466 let config = ConfigBuilder::new("sdk-key")
1467 .daemon_mode(true)
1468 .data_store(&builder)
1469 .event_processor(&NullEventProcessorBuilder::new())
1470 .build()
1471 .expect("config should build");
1472
1473 let client = Client::build(config).expect("Should be built.");
1474
1475 client.start_with_default_executor();
1476
1477 let context = ContextBuilder::new("bob")
1478 .build()
1479 .expect("Failed to create context");
1480
1481 let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1482
1483 assert!(detail.value.unwrap().as_bool().unwrap());
1484 assert!(matches!(
1485 detail.reason,
1486 Reason::Fallthrough {
1487 in_experiment: false
1488 }
1489 ));
1490 client.flush();
1491 client.close();
1492
1493 testing_logger::validate(|captured_logs| {
1494 assert_eq!(captured_logs.len(), 1);
1495 assert_eq!(
1496 captured_logs[0].body,
1497 "Started LaunchDarkly Client in daemon mode"
1498 );
1499 });
1500 }
1501
1502 #[test]
1503 fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1504 testing_logger::setup();
1505
1506 let factory = InMemoryPersistentDataStoreFactory {
1507 data: AllData {
1508 flags: HashMap::new(),
1509 segments: HashMap::new(),
1510 },
1511 initialized: false,
1512 };
1513 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1514
1515 let config = ConfigBuilder::new("sdk-key")
1516 .daemon_mode(true)
1517 .data_store(&builder)
1518 .event_processor(&NullEventProcessorBuilder::new())
1519 .build()
1520 .expect("config should build");
1521
1522 let client = Client::build(config).expect("Should be built.");
1523
1524 client.start_with_default_executor();
1525
1526 let context = ContextBuilder::new("bob")
1527 .build()
1528 .expect("Failed to create context");
1529
1530 client.variation_detail(&context, "flag", FlagValue::Bool(false));
1531
1532 testing_logger::validate(|captured_logs| {
1533 assert_eq!(captured_logs.len(), 1);
1534 assert_eq!(
1535 captured_logs[0].body,
1536 "Started LaunchDarkly Client in daemon mode"
1537 );
1538 });
1539 }
1540
1541 #[test]
1542 fn variation_handles_off_flag_without_variation() {
1543 let (client, event_rx) = make_mocked_client();
1544 client.start_with_default_executor();
1545
1546 client
1547 .data_store
1548 .write()
1549 .upsert(
1550 "myFlag",
1551 PatchTarget::Flag(StorageItem::Item(basic_off_flag("myFlag"))),
1552 )
1553 .expect("patch should apply");
1554 let context = ContextBuilder::new("bob")
1555 .build()
1556 .expect("Failed to create context");
1557
1558 let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1559
1560 assert!(!result.as_bool().unwrap());
1561 client.flush();
1562 client.close();
1563
1564 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1565 assert_eq!(events.len(), 2);
1566 assert_eq!(events[0].kind(), "index");
1567 assert_eq!(events[1].kind(), "summary");
1568
1569 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1570 let variation_key = VariationKey {
1571 version: Some(42),
1572 variation: None,
1573 };
1574 let feature = event_summary.features.get("myFlag");
1575 assert!(feature.is_some());
1576
1577 let feature = feature.unwrap();
1578 assert!(feature.counters.contains_key(&variation_key));
1579 } else {
1580 panic!("Event should be a summary type");
1581 }
1582 }
1583
1584 #[test]
1585 fn variation_detail_tracks_prereq_events_correctly() {
1586 let (client, event_rx) = make_mocked_client();
1587 client.start_with_default_executor();
1588
1589 let mut basic_preqreq_flag = basic_flag("prereqFlag");
1590 basic_preqreq_flag.track_events = true;
1591
1592 client
1593 .data_store
1594 .write()
1595 .upsert(
1596 "prereqFlag",
1597 PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1598 )
1599 .expect("patch should apply");
1600
1601 let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1602 basic_flag.track_events = true;
1603 client
1604 .data_store
1605 .write()
1606 .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1607 .expect("patch should apply");
1608 let context = ContextBuilder::new("bob")
1609 .build()
1610 .expect("Failed to create context");
1611
1612 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1613
1614 assert!(detail.value.unwrap().as_bool().unwrap());
1615 assert!(matches!(
1616 detail.reason,
1617 Reason::Fallthrough {
1618 in_experiment: false
1619 }
1620 ));
1621 client.flush();
1622 client.close();
1623
1624 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1625 assert_eq!(events.len(), 4);
1626 assert_eq!(events[0].kind(), "index");
1627 assert_eq!(events[1].kind(), "feature");
1628 assert_eq!(events[2].kind(), "feature");
1629 assert_eq!(events[3].kind(), "summary");
1630
1631 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1632 let variation_key = VariationKey {
1633 version: Some(42),
1634 variation: Some(1),
1635 };
1636 let feature = event_summary.features.get("myFlag");
1637 assert!(feature.is_some());
1638
1639 let feature = feature.unwrap();
1640 assert!(feature.counters.contains_key(&variation_key));
1641
1642 let variation_key = VariationKey {
1643 version: Some(42),
1644 variation: Some(1),
1645 };
1646 let feature = event_summary.features.get("prereqFlag");
1647 assert!(feature.is_some());
1648
1649 let feature = feature.unwrap();
1650 assert!(feature.counters.contains_key(&variation_key));
1651 }
1652 }
1653
1654 #[test]
1655 fn variation_handles_failed_prereqs_correctly() {
1656 let (client, event_rx) = make_mocked_client();
1657 client.start_with_default_executor();
1658
1659 let mut basic_preqreq_flag = basic_off_flag("prereqFlag");
1660 basic_preqreq_flag.track_events = true;
1661
1662 client
1663 .data_store
1664 .write()
1665 .upsert(
1666 "prereqFlag",
1667 PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1668 )
1669 .expect("patch should apply");
1670
1671 let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1672 basic_flag.track_events = true;
1673 client
1674 .data_store
1675 .write()
1676 .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1677 .expect("patch should apply");
1678 let context = ContextBuilder::new("bob")
1679 .build()
1680 .expect("Failed to create context");
1681
1682 let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1683
1684 assert!(!detail.as_bool().unwrap());
1685 client.flush();
1686 client.close();
1687
1688 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1689 assert_eq!(events.len(), 4);
1690 assert_eq!(events[0].kind(), "index");
1691 assert_eq!(events[1].kind(), "feature");
1692 assert_eq!(events[2].kind(), "feature");
1693 assert_eq!(events[3].kind(), "summary");
1694
1695 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1696 let variation_key = VariationKey {
1697 version: Some(42),
1698 variation: Some(0),
1699 };
1700 let feature = event_summary.features.get("myFlag");
1701 assert!(feature.is_some());
1702
1703 let feature = feature.unwrap();
1704 assert!(feature.counters.contains_key(&variation_key));
1705
1706 let variation_key = VariationKey {
1707 version: Some(42),
1708 variation: None,
1709 };
1710 let feature = event_summary.features.get("prereqFlag");
1711 assert!(feature.is_some());
1712
1713 let feature = feature.unwrap();
1714 assert!(feature.counters.contains_key(&variation_key));
1715 }
1716 }
1717
1718 #[test]
1719 fn variation_detail_handles_flag_not_found() {
1720 let (client, event_rx) = make_mocked_client();
1721 client.start_with_default_executor();
1722
1723 let context = ContextBuilder::new("bob")
1724 .build()
1725 .expect("Failed to create context");
1726 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1727
1728 assert!(!detail.value.unwrap().as_bool().unwrap());
1729 assert!(matches!(
1730 detail.reason,
1731 Reason::Error {
1732 error: eval::Error::FlagNotFound
1733 }
1734 ));
1735 client.flush();
1736 client.close();
1737
1738 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1739 assert_eq!(events.len(), 2);
1740 assert_eq!(events[0].kind(), "index");
1741 assert_eq!(events[1].kind(), "summary");
1742
1743 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1744 let variation_key = VariationKey {
1745 version: None,
1746 variation: None,
1747 };
1748 let feature = event_summary.features.get("non-existent-flag");
1749 assert!(feature.is_some());
1750
1751 let feature = feature.unwrap();
1752 assert!(feature.counters.contains_key(&variation_key));
1753 } else {
1754 panic!("Event should be a summary type");
1755 }
1756 }
1757
1758 #[tokio::test]
1759 async fn variation_detail_handles_client_not_ready() {
1760 let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1761 client.start_with_default_executor();
1762 let context = ContextBuilder::new("bob")
1763 .build()
1764 .expect("Failed to create context");
1765
1766 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1767
1768 assert!(!detail.value.unwrap().as_bool().unwrap());
1769 assert!(matches!(
1770 detail.reason,
1771 Reason::Error {
1772 error: eval::Error::ClientNotReady
1773 }
1774 ));
1775 client.flush();
1776 client.close();
1777
1778 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1779 assert_eq!(events.len(), 2);
1780 assert_eq!(events[0].kind(), "index");
1781 assert_eq!(events[1].kind(), "summary");
1782
1783 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1784 let variation_key = VariationKey {
1785 version: None,
1786 variation: None,
1787 };
1788 let feature = event_summary.features.get("non-existent-flag");
1789 assert!(feature.is_some());
1790
1791 let feature = feature.unwrap();
1792 assert!(feature.counters.contains_key(&variation_key));
1793 } else {
1794 panic!("Event should be a summary type");
1795 }
1796 }
1797
1798 #[test]
1799 fn identify_sends_identify_event() {
1800 let (client, event_rx) = make_mocked_client();
1801 client.start_with_default_executor();
1802
1803 let context = ContextBuilder::new("bob")
1804 .build()
1805 .expect("Failed to create context");
1806
1807 client.identify(context);
1808 client.flush();
1809 client.close();
1810
1811 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1812 assert_eq!(events.len(), 1);
1813 assert_eq!(events[0].kind(), "identify");
1814 }
1815
1816 #[test]
1817 fn identify_sends_sends_nothing_in_offline_mode() {
1818 let (client, event_rx) = make_mocked_offline_client();
1819 client.start_with_default_executor();
1820
1821 let context = ContextBuilder::new("bob")
1822 .build()
1823 .expect("Failed to create context");
1824
1825 client.identify(context);
1826 client.flush();
1827 client.close();
1828
1829 assert_eq!(event_rx.iter().count(), 0);
1830 }
1831
1832 #[test]
1833 fn secure_mode_hash() {
1834 let config = ConfigBuilder::new("secret")
1835 .offline(true)
1836 .build()
1837 .expect("config should build");
1838 let client = Client::build(config).expect("Should be built.");
1839 let context = ContextBuilder::new("Message")
1840 .build()
1841 .expect("Failed to create context");
1842
1843 assert_eq!(
1844 client.secure_mode_hash(&context),
1845 "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1846 );
1847 }
1848
1849 #[test]
1850 fn secure_mode_hash_with_multi_kind() {
1851 let config = ConfigBuilder::new("secret")
1852 .offline(true)
1853 .build()
1854 .expect("config should build");
1855 let client = Client::build(config).expect("Should be built.");
1856
1857 let org = ContextBuilder::new("org-key|1")
1858 .kind("org")
1859 .build()
1860 .expect("Failed to create context");
1861 let user = ContextBuilder::new("user-key:2")
1862 .build()
1863 .expect("Failed to create context");
1864
1865 let context = MultiContextBuilder::new()
1866 .add_context(org)
1867 .add_context(user)
1868 .build()
1869 .expect("failed to build multi-context");
1870
1871 assert_eq!(
1872 client.secure_mode_hash(&context),
1873 "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1874 );
1875 }
1876
1877 #[derive(Serialize)]
1878 struct MyCustomData {
1879 pub answer: u32,
1880 }
1881
1882 #[test]
1883 fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1884 let (client, event_rx) = make_mocked_client();
1885 client.start_with_default_executor();
1886
1887 let context = ContextBuilder::new("bob")
1888 .build()
1889 .expect("Failed to create context");
1890
1891 client.track_event(context.clone(), "event-with-null");
1892 client.track_data(context.clone(), "event-with-string", "string-data")?;
1893 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1894 client.track_data(
1895 context.clone(),
1896 "event-with-struct",
1897 MyCustomData { answer: 42 },
1898 )?;
1899 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1900
1901 client.flush();
1902 client.close();
1903
1904 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1905 assert_eq!(events.len(), 6);
1906
1907 let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1908 for event in events {
1909 if let Some(count) = events_by_type.get_mut(event.kind()) {
1910 *count += 1;
1911 } else {
1912 events_by_type.insert(event.kind(), 1);
1913 }
1914 }
1915 assert!(matches!(events_by_type.get("index"), Some(1)));
1916 assert!(matches!(events_by_type.get("custom"), Some(5)));
1917
1918 Ok(())
1919 }
1920
1921 #[test]
1922 fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1923 let (client, event_rx) = make_mocked_offline_client();
1924 client.start_with_default_executor();
1925
1926 let context = ContextBuilder::new("bob")
1927 .build()
1928 .expect("Failed to create context");
1929
1930 client.track_event(context.clone(), "event-with-null");
1931 client.track_data(context.clone(), "event-with-string", "string-data")?;
1932 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1933 client.track_data(
1934 context.clone(),
1935 "event-with-struct",
1936 MyCustomData { answer: 42 },
1937 )?;
1938 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1939
1940 client.flush();
1941 client.close();
1942
1943 assert_eq!(event_rx.iter().count(), 0);
1944
1945 Ok(())
1946 }
1947
1948 #[test]
1949 fn migration_handles_flag_not_found() {
1950 let (client, _event_rx) = make_mocked_client();
1951 client.start_with_default_executor();
1952
1953 let context = ContextBuilder::new("bob")
1954 .build()
1955 .expect("Failed to create context");
1956
1957 let (stage, _tracker) =
1958 client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1959
1960 assert_eq!(stage, Stage::Off);
1961 }
1962
1963 #[test]
1964 fn migration_uses_non_migration_flag() {
1965 let (client, _event_rx) = make_mocked_client();
1966 client.start_with_default_executor();
1967 client
1968 .data_store
1969 .write()
1970 .upsert(
1971 "boolean-flag",
1972 PatchTarget::Flag(StorageItem::Item(basic_flag("boolean-flag"))),
1973 )
1974 .expect("patch should apply");
1975
1976 let context = ContextBuilder::new("bob")
1977 .build()
1978 .expect("Failed to create context");
1979
1980 let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1981
1982 assert_eq!(stage, Stage::Off);
1983 }
1984
1985 #[test_case(Stage::Off)]
1986 #[test_case(Stage::DualWrite)]
1987 #[test_case(Stage::Shadow)]
1988 #[test_case(Stage::Live)]
1989 #[test_case(Stage::Rampdown)]
1990 #[test_case(Stage::Complete)]
1991 fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1992 let (client, _event_rx) = make_mocked_client();
1993 client.start_with_default_executor();
1994 client
1995 .data_store
1996 .write()
1997 .upsert(
1998 "stage-flag",
1999 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2000 )
2001 .expect("patch should apply");
2002
2003 let context = ContextBuilder::new("bob")
2004 .build()
2005 .expect("Failed to create context");
2006
2007 let (evaluated_stage, _tracker) =
2008 client.migration_variation(&context, "stage-flag", Stage::Off);
2009
2010 assert_eq!(evaluated_stage, stage);
2011 }
2012
2013 #[tokio::test]
2014 async fn migration_tracks_invoked_correctly() {
2015 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
2016 .await;
2017 migration_tracks_invoked_correctly_driver(
2018 Stage::DualWrite,
2019 Operation::Read,
2020 vec![Origin::Old],
2021 )
2022 .await;
2023 migration_tracks_invoked_correctly_driver(
2024 Stage::Shadow,
2025 Operation::Read,
2026 vec![Origin::Old, Origin::New],
2027 )
2028 .await;
2029 migration_tracks_invoked_correctly_driver(
2030 Stage::Live,
2031 Operation::Read,
2032 vec![Origin::Old, Origin::New],
2033 )
2034 .await;
2035 migration_tracks_invoked_correctly_driver(
2036 Stage::Rampdown,
2037 Operation::Read,
2038 vec![Origin::New],
2039 )
2040 .await;
2041 migration_tracks_invoked_correctly_driver(
2042 Stage::Complete,
2043 Operation::Read,
2044 vec![Origin::New],
2045 )
2046 .await;
2047 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
2048 .await;
2049 migration_tracks_invoked_correctly_driver(
2050 Stage::DualWrite,
2051 Operation::Write,
2052 vec![Origin::Old, Origin::New],
2053 )
2054 .await;
2055 migration_tracks_invoked_correctly_driver(
2056 Stage::Shadow,
2057 Operation::Write,
2058 vec![Origin::Old, Origin::New],
2059 )
2060 .await;
2061 migration_tracks_invoked_correctly_driver(
2062 Stage::Live,
2063 Operation::Write,
2064 vec![Origin::Old, Origin::New],
2065 )
2066 .await;
2067 migration_tracks_invoked_correctly_driver(
2068 Stage::Rampdown,
2069 Operation::Write,
2070 vec![Origin::Old, Origin::New],
2071 )
2072 .await;
2073 migration_tracks_invoked_correctly_driver(
2074 Stage::Complete,
2075 Operation::Write,
2076 vec![Origin::New],
2077 )
2078 .await;
2079 }
2080
2081 async fn migration_tracks_invoked_correctly_driver(
2082 stage: Stage,
2083 operation: Operation,
2084 origins: Vec<Origin>,
2085 ) {
2086 let (client, event_rx) = make_mocked_client();
2087 let client = Arc::new(client);
2088 client.start_with_default_executor();
2089 client
2090 .data_store
2091 .write()
2092 .upsert(
2093 "stage-flag",
2094 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2095 )
2096 .expect("patch should apply");
2097
2098 let mut migrator = MigratorBuilder::new(client.clone())
2099 .read(
2100 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2101 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2102 Some(|_, _| true),
2103 )
2104 .write(
2105 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2106 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2107 )
2108 .build()
2109 .expect("migrator should build");
2110
2111 let context = ContextBuilder::new("bob")
2112 .build()
2113 .expect("Failed to create context");
2114
2115 if let Operation::Read = operation {
2116 migrator
2117 .read(
2118 &context,
2119 "stage-flag".into(),
2120 Stage::Off,
2121 serde_json::Value::Null,
2122 )
2123 .await;
2124 } else {
2125 migrator
2126 .write(
2127 &context,
2128 "stage-flag".into(),
2129 Stage::Off,
2130 serde_json::Value::Null,
2131 )
2132 .await;
2133 }
2134
2135 client.flush();
2136 client.close();
2137
2138 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2139 assert_eq!(events.len(), 3);
2140 match &events[1] {
2141 OutputEvent::MigrationOp(event) => {
2142 assert!(event.invoked.len() == origins.len());
2143 assert!(event.invoked.iter().all(|i| origins.contains(i)));
2144 }
2145 _ => panic!("Expected migration event"),
2146 }
2147 }
2148
2149 #[tokio::test]
2150 async fn migration_tracks_latency() {
2151 migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2152 migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2153 migration_tracks_latency_driver(
2154 Stage::Shadow,
2155 Operation::Read,
2156 vec![Origin::Old, Origin::New],
2157 )
2158 .await;
2159 migration_tracks_latency_driver(
2160 Stage::Live,
2161 Operation::Read,
2162 vec![Origin::Old, Origin::New],
2163 )
2164 .await;
2165 migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2166 migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2167 migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2168 migration_tracks_latency_driver(
2169 Stage::DualWrite,
2170 Operation::Write,
2171 vec![Origin::Old, Origin::New],
2172 )
2173 .await;
2174 migration_tracks_latency_driver(
2175 Stage::Shadow,
2176 Operation::Write,
2177 vec![Origin::Old, Origin::New],
2178 )
2179 .await;
2180 migration_tracks_latency_driver(
2181 Stage::Live,
2182 Operation::Write,
2183 vec![Origin::Old, Origin::New],
2184 )
2185 .await;
2186 migration_tracks_latency_driver(
2187 Stage::Rampdown,
2188 Operation::Write,
2189 vec![Origin::Old, Origin::New],
2190 )
2191 .await;
2192 migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2193 }
2194
2195 async fn migration_tracks_latency_driver(
2196 stage: Stage,
2197 operation: Operation,
2198 origins: Vec<Origin>,
2199 ) {
2200 let (client, event_rx) = make_mocked_client();
2201 let client = Arc::new(client);
2202 client.start_with_default_executor();
2203 client
2204 .data_store
2205 .write()
2206 .upsert(
2207 "stage-flag",
2208 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2209 )
2210 .expect("patch should apply");
2211
2212 let mut migrator = MigratorBuilder::new(client.clone())
2213 .track_latency(true)
2214 .read(
2215 |_| {
2216 async move {
2217 async_std::task::sleep(Duration::from_millis(100)).await;
2218 Ok(serde_json::Value::Null)
2219 }
2220 .boxed()
2221 },
2222 |_| {
2223 async move {
2224 async_std::task::sleep(Duration::from_millis(100)).await;
2225 Ok(serde_json::Value::Null)
2226 }
2227 .boxed()
2228 },
2229 Some(|_, _| true),
2230 )
2231 .write(
2232 |_| {
2233 async move {
2234 async_std::task::sleep(Duration::from_millis(100)).await;
2235 Ok(serde_json::Value::Null)
2236 }
2237 .boxed()
2238 },
2239 |_| {
2240 async move {
2241 async_std::task::sleep(Duration::from_millis(100)).await;
2242 Ok(serde_json::Value::Null)
2243 }
2244 .boxed()
2245 },
2246 )
2247 .build()
2248 .expect("migrator should build");
2249
2250 let context = ContextBuilder::new("bob")
2251 .build()
2252 .expect("Failed to create context");
2253
2254 if let Operation::Read = operation {
2255 migrator
2256 .read(
2257 &context,
2258 "stage-flag".into(),
2259 Stage::Off,
2260 serde_json::Value::Null,
2261 )
2262 .await;
2263 } else {
2264 migrator
2265 .write(
2266 &context,
2267 "stage-flag".into(),
2268 Stage::Off,
2269 serde_json::Value::Null,
2270 )
2271 .await;
2272 }
2273
2274 client.flush();
2275 client.close();
2276
2277 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2278 assert_eq!(events.len(), 3);
2279 match &events[1] {
2280 OutputEvent::MigrationOp(event) => {
2281 assert!(event.latency.len() == origins.len());
2282 assert!(event
2283 .latency
2284 .values()
2285 .all(|l| l > &Duration::from_millis(100)));
2286 }
2287 _ => panic!("Expected migration event"),
2288 }
2289 }
2290
2291 #[tokio::test]
2292 async fn migration_tracks_read_errors() {
2293 migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2294 migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2295 migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2296 migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2297 migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2298 migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2299 }
2300
2301 async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2302 let (client, event_rx) = make_mocked_client();
2303 let client = Arc::new(client);
2304 client.start_with_default_executor();
2305 client
2306 .data_store
2307 .write()
2308 .upsert(
2309 "stage-flag",
2310 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2311 )
2312 .expect("patch should apply");
2313
2314 let mut migrator = MigratorBuilder::new(client.clone())
2315 .track_latency(true)
2316 .read(
2317 |_| async move { Err("fail".into()) }.boxed(),
2318 |_| async move { Err("fail".into()) }.boxed(),
2319 Some(|_: &String, _: &String| true),
2320 )
2321 .write(
2322 |_| async move { Err("fail".into()) }.boxed(),
2323 |_| async move { Err("fail".into()) }.boxed(),
2324 )
2325 .build()
2326 .expect("migrator should build");
2327
2328 let context = ContextBuilder::new("bob")
2329 .build()
2330 .expect("Failed to create context");
2331
2332 migrator
2333 .read(
2334 &context,
2335 "stage-flag".into(),
2336 Stage::Off,
2337 serde_json::Value::Null,
2338 )
2339 .await;
2340 client.flush();
2341 client.close();
2342
2343 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2344 assert_eq!(events.len(), 3);
2345 match &events[1] {
2346 OutputEvent::MigrationOp(event) => {
2347 assert!(event.errors.len() == origins.len());
2348 assert!(event.errors.iter().all(|i| origins.contains(i)));
2349 }
2350 _ => panic!("Expected migration event"),
2351 }
2352 }
2353
2354 #[tokio::test]
2355 async fn migration_tracks_authoritative_write_errors() {
2356 migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2357 migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2358 .await;
2359 migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2360 migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2361 migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2362 .await;
2363 migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2364 .await;
2365 }
2366
2367 async fn migration_tracks_authoritative_write_errors_driver(
2368 stage: Stage,
2369 origins: Vec<Origin>,
2370 ) {
2371 let (client, event_rx) = make_mocked_client();
2372 let client = Arc::new(client);
2373 client.start_with_default_executor();
2374 client
2375 .data_store
2376 .write()
2377 .upsert(
2378 "stage-flag",
2379 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2380 )
2381 .expect("patch should apply");
2382
2383 let mut migrator = MigratorBuilder::new(client.clone())
2384 .track_latency(true)
2385 .read(
2386 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2387 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2388 None,
2389 )
2390 .write(
2391 |_| async move { Err("fail".into()) }.boxed(),
2392 |_| async move { Err("fail".into()) }.boxed(),
2393 )
2394 .build()
2395 .expect("migrator should build");
2396
2397 let context = ContextBuilder::new("bob")
2398 .build()
2399 .expect("Failed to create context");
2400
2401 migrator
2402 .write(
2403 &context,
2404 "stage-flag".into(),
2405 Stage::Off,
2406 serde_json::Value::Null,
2407 )
2408 .await;
2409
2410 client.flush();
2411 client.close();
2412
2413 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2414 assert_eq!(events.len(), 3);
2415 match &events[1] {
2416 OutputEvent::MigrationOp(event) => {
2417 assert!(event.errors.len() == origins.len());
2418 assert!(event.errors.iter().all(|i| origins.contains(i)));
2419 }
2420 _ => panic!("Expected migration event"),
2421 }
2422 }
2423
2424 #[tokio::test]
2425 async fn migration_tracks_nonauthoritative_write_errors() {
2426 migration_tracks_nonauthoritative_write_errors_driver(
2427 Stage::DualWrite,
2428 false,
2429 true,
2430 vec![Origin::New],
2431 )
2432 .await;
2433 migration_tracks_nonauthoritative_write_errors_driver(
2434 Stage::Shadow,
2435 false,
2436 true,
2437 vec![Origin::New],
2438 )
2439 .await;
2440 migration_tracks_nonauthoritative_write_errors_driver(
2441 Stage::Live,
2442 true,
2443 false,
2444 vec![Origin::Old],
2445 )
2446 .await;
2447 migration_tracks_nonauthoritative_write_errors_driver(
2448 Stage::Rampdown,
2449 true,
2450 false,
2451 vec![Origin::Old],
2452 )
2453 .await;
2454 }
2455
2456 async fn migration_tracks_nonauthoritative_write_errors_driver(
2457 stage: Stage,
2458 fail_old: bool,
2459 fail_new: bool,
2460 origins: Vec<Origin>,
2461 ) {
2462 let (client, event_rx) = make_mocked_client();
2463 let client = Arc::new(client);
2464 client.start_with_default_executor();
2465 client
2466 .data_store
2467 .write()
2468 .upsert(
2469 "stage-flag",
2470 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2471 )
2472 .expect("patch should apply");
2473
2474 let mut migrator = MigratorBuilder::new(client.clone())
2475 .track_latency(true)
2476 .read(
2477 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2478 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2479 None,
2480 )
2481 .write(
2482 move |_| {
2483 async move {
2484 if fail_old {
2485 Err("fail".into())
2486 } else {
2487 Ok(serde_json::Value::Null)
2488 }
2489 }
2490 .boxed()
2491 },
2492 move |_| {
2493 async move {
2494 if fail_new {
2495 Err("fail".into())
2496 } else {
2497 Ok(serde_json::Value::Null)
2498 }
2499 }
2500 .boxed()
2501 },
2502 )
2503 .build()
2504 .expect("migrator should build");
2505
2506 let context = ContextBuilder::new("bob")
2507 .build()
2508 .expect("Failed to create context");
2509
2510 migrator
2511 .write(
2512 &context,
2513 "stage-flag".into(),
2514 Stage::Off,
2515 serde_json::Value::Null,
2516 )
2517 .await;
2518
2519 client.flush();
2520 client.close();
2521
2522 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2523 assert_eq!(events.len(), 3);
2524 match &events[1] {
2525 OutputEvent::MigrationOp(event) => {
2526 assert!(event.errors.len() == origins.len());
2527 assert!(event.errors.iter().all(|i| origins.contains(i)));
2528 }
2529 _ => panic!("Expected migration event"),
2530 }
2531 }
2532
2533 #[tokio::test]
2534 async fn migration_tracks_consistency() {
2535 migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2536 migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2537 migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2538 migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2539 }
2540
2541 async fn migration_tracks_consistency_driver(
2542 stage: Stage,
2543 old_return: &'static str,
2544 new_return: &'static str,
2545 expected_consistency: bool,
2546 ) {
2547 let (client, event_rx) = make_mocked_client();
2548 let client = Arc::new(client);
2549 client.start_with_default_executor();
2550 client
2551 .data_store
2552 .write()
2553 .upsert(
2554 "stage-flag",
2555 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2556 )
2557 .expect("patch should apply");
2558
2559 let mut migrator = MigratorBuilder::new(client.clone())
2560 .track_latency(true)
2561 .read(
2562 |_| {
2563 async move {
2564 async_std::task::sleep(Duration::from_millis(100)).await;
2565 Ok(serde_json::Value::String(old_return.to_string()))
2566 }
2567 .boxed()
2568 },
2569 |_| {
2570 async move {
2571 async_std::task::sleep(Duration::from_millis(100)).await;
2572 Ok(serde_json::Value::String(new_return.to_string()))
2573 }
2574 .boxed()
2575 },
2576 Some(|lhs, rhs| lhs == rhs),
2577 )
2578 .write(
2579 |_| {
2580 async move {
2581 async_std::task::sleep(Duration::from_millis(100)).await;
2582 Ok(serde_json::Value::Null)
2583 }
2584 .boxed()
2585 },
2586 |_| {
2587 async move {
2588 async_std::task::sleep(Duration::from_millis(100)).await;
2589 Ok(serde_json::Value::Null)
2590 }
2591 .boxed()
2592 },
2593 )
2594 .build()
2595 .expect("migrator should build");
2596
2597 let context = ContextBuilder::new("bob")
2598 .build()
2599 .expect("Failed to create context");
2600
2601 migrator
2602 .read(
2603 &context,
2604 "stage-flag".into(),
2605 Stage::Off,
2606 serde_json::Value::Null,
2607 )
2608 .await;
2609
2610 client.flush();
2611 client.close();
2612
2613 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2614 assert_eq!(events.len(), 3);
2615 match &events[1] {
2616 OutputEvent::MigrationOp(event) => {
2617 assert!(event.consistency_check == Some(expected_consistency))
2618 }
2619 _ => panic!("Expected migration event"),
2620 }
2621 }
2622
2623 fn make_mocked_client_with_delay(
2624 delay: u64,
2625 offline: bool,
2626 daemon_mode: bool,
2627 ) -> (Client, Receiver<OutputEvent>) {
2628 let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2629 let (event_sender, event_rx) = create_event_sender();
2630
2631 let config = ConfigBuilder::new("sdk-key")
2632 .offline(offline)
2633 .daemon_mode(daemon_mode)
2634 .data_source(MockDataSourceBuilder::new().data_source(updates))
2635 .event_processor(
2636 EventProcessorBuilder::<HttpConnector>::new().event_sender(Arc::new(event_sender)),
2637 )
2638 .build()
2639 .expect("config should build");
2640
2641 let client = Client::build(config).expect("Should be built.");
2642
2643 (client, event_rx)
2644 }
2645
2646 fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2647 make_mocked_client_with_delay(0, true, false)
2648 }
2649
2650 fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2651 make_mocked_client_with_delay(0, false, false)
2652 }
2653}