1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::{DataSource, EventReceived};
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28 disabled: bool,
29 event_factory: EventFactory,
30 prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34 event_factory: EventFactory,
35 event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39 fn record(&self, event: PrerequisiteEvent) {
40 let evt = self.event_factory.new_eval_event(
41 &event.prerequisite_flag.key,
42 event.context.clone(),
43 &event.prerequisite_flag,
44 event.prerequisite_result,
45 FlagValue::Json(serde_json::Value::Null),
46 Some(event.target_flag_key),
47 );
48
49 self.event_processor.send(evt);
50 }
51}
52
53#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57 #[error("invalid client config: {0}")]
59 InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63 fn from(error: DataSourceError) -> Self {
64 Self::InvalidConfig(error.to_string())
65 }
66}
67
68impl From<DataStoreError> for BuildError {
69 fn from(error: DataStoreError) -> Self {
70 Self::InvalidConfig(error.to_string())
71 }
72}
73
74impl From<EventProcessorError> for BuildError {
75 fn from(error: EventProcessorError) -> Self {
76 Self::InvalidConfig(error.to_string())
77 }
78}
79
80impl From<ConfigBuildError> for BuildError {
81 fn from(error: ConfigBuildError) -> Self {
82 Self::InvalidConfig(error.to_string())
83 }
84}
85
86#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90 #[error("couldn't spawn background thread for client: {0}")]
92 SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97 Initializing = 0,
98 Initialized = 1,
99 InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103 fn eq(&self, other: &usize) -> bool {
104 *self as usize == *other
105 }
106}
107
108impl From<usize> for ClientInitState {
109 fn from(val: usize) -> Self {
110 match val {
111 0 => ClientInitState::Initializing,
112 1 => ClientInitState::Initialized,
113 2 => ClientInitState::InitializationFailed,
114 _ => unreachable!(),
115 }
116 }
117}
118
119pub struct Client {
149 event_processor: Arc<dyn EventProcessor>,
150 data_source: Arc<dyn DataSource>,
151 data_store: Arc<RwLock<dyn DataStore>>,
152 events_default: EventsScope,
153 events_with_reasons: EventsScope,
154 init_notify: Arc<Semaphore>,
155 init_state: Arc<AtomicUsize>,
156 started: AtomicBool,
157 offline: bool,
158 daemon_mode: bool,
159 sdk_key: String,
160 shutdown_broadcast: broadcast::Sender<()>,
161 runtime: RwLock<Option<Runtime>>,
162}
163
164impl Client {
165 pub fn build(config: Config) -> Result<Self, BuildError> {
167 if config.offline() {
168 info!("Started LaunchDarkly Client in offline mode");
169 } else if config.daemon_mode() {
170 info!("Started LaunchDarkly Client in daemon mode");
171 }
172
173 let tags = config.application_tag();
174
175 let endpoints = config.service_endpoints_builder().build()?;
176 let event_processor =
177 config
178 .event_processor_builder()
179 .build(&endpoints, config.sdk_key(), tags.clone())?;
180 let data_source =
181 config
182 .data_source_builder()
183 .build(&endpoints, config.sdk_key(), tags.clone())?;
184 let data_store = config.data_store_builder().build()?;
185
186 let events_default = EventsScope {
187 disabled: config.offline(),
188 event_factory: EventFactory::new(false),
189 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
190 event_factory: EventFactory::new(false),
191 event_processor: event_processor.clone(),
192 }),
193 };
194
195 let events_with_reasons = EventsScope {
196 disabled: config.offline(),
197 event_factory: EventFactory::new(true),
198 prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
199 event_factory: EventFactory::new(true),
200 event_processor: event_processor.clone(),
201 }),
202 };
203
204 let (shutdown_tx, _) = broadcast::channel(1);
205
206 Ok(Client {
207 event_processor,
208 data_source,
209 data_store,
210 events_default,
211 events_with_reasons,
212 init_notify: Arc::new(Semaphore::new(0)),
213 init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
214 started: AtomicBool::new(false),
215 offline: config.offline(),
216 daemon_mode: config.daemon_mode(),
217 sdk_key: config.sdk_key().into(),
218 shutdown_broadcast: shutdown_tx,
219 runtime: RwLock::new(None),
220 })
221 }
222
223 pub fn start_with_default_executor_and_callback(&self, event_received: EventReceived) {
227 if self.started.load(Ordering::SeqCst) {
228 return;
229 }
230 self.started.store(true, Ordering::SeqCst);
231 self.start_with_default_executor_internal(event_received);
232 }
233
234 pub fn start_with_default_executor(&self) {
236 if self.started.load(Ordering::SeqCst) {
237 return;
238 }
239 self.started.store(true, Ordering::SeqCst);
240 self.start_with_default_executor_internal(Arc::new(move |_ev| {}));
241 }
242
243 fn start_with_default_executor_internal(&self, event_received: EventReceived) {
244 let notify = self.init_notify.clone();
248 let init_state = self.init_state.clone();
249
250 self.data_source.subscribe(
251 self.data_store.clone(),
252 Arc::new(move |success| {
253 init_state.store(
254 (if success {
255 ClientInitState::Initialized
256 } else {
257 ClientInitState::InitializationFailed
258 }) as usize,
259 Ordering::SeqCst,
260 );
261 notify.add_permits(1);
262 }),
263 event_received,
264 self.shutdown_broadcast.subscribe(),
265 );
266 }
267
268 pub fn start_with_runtime(&self) -> Result<bool, StartError> {
274 if self.started.load(Ordering::SeqCst) {
275 return Ok(true);
276 }
277 self.started.store(true, Ordering::SeqCst);
278
279 let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
280 let _guard = runtime.enter();
281 self.runtime.write().replace(runtime);
282
283 self.start_with_default_executor_internal(Arc::new(move |_ev| {}));
284
285 Ok(true)
286 }
287
288 #[deprecated(
292 note = "blocking without a timeout is discouraged, use wait_for_initialization instead"
293 )]
294 pub async fn initialized_async(&self) -> bool {
295 self.initialized_async_internal().await
296 }
297
298 pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
304 if timeout > Duration::from_secs(60) {
305 warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
306 }
307
308 let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
309 initialized.ok()
310 }
311
312 async fn initialized_async_internal(&self) -> bool {
313 if self.offline || self.daemon_mode {
314 return true;
315 }
316
317 if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
323 let _permit = self.init_notify.acquire().await;
324 }
325 ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
326 }
327
328 pub fn initialized(&self) -> bool {
332 self.offline
333 || self.daemon_mode
334 || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
335 }
336
337 pub fn close(&self) {
341 self.event_processor.close();
342
343 if !self.offline && !self.daemon_mode {
346 if let Err(e) = self.shutdown_broadcast.send(()) {
347 error!("Failed to shutdown client appropriately: {}", e);
348 }
349 }
350
351 self.runtime.write().take();
354 }
355
356 pub fn flush(&self) {
364 self.event_processor.flush();
365 }
366
367 pub fn identify(&self, context: Context) {
372 if self.events_default.disabled {
373 return;
374 }
375
376 self.send_internal(self.events_default.event_factory.new_identify(context));
377 }
378
379 pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
387 let val = self.variation(context, flag_key, default);
388 if let Some(b) = val.as_bool() {
389 b
390 } else {
391 warn!(
392 "bool_variation called for a non-bool flag {:?} (got {:?})",
393 flag_key, val
394 );
395 default
396 }
397 }
398
399 pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
407 let val = self.variation(context, flag_key, default.clone());
408 if let Some(s) = val.as_string() {
409 s
410 } else {
411 warn!(
412 "str_variation called for a non-string flag {:?} (got {:?})",
413 flag_key, val
414 );
415 default
416 }
417 }
418
419 pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
427 let val = self.variation(context, flag_key, default);
428 if let Some(f) = val.as_float() {
429 f
430 } else {
431 warn!(
432 "float_variation called for a non-float flag {:?} (got {:?})",
433 flag_key, val
434 );
435 default
436 }
437 }
438
439 pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
447 let val = self.variation(context, flag_key, default);
448 if let Some(f) = val.as_int() {
449 f
450 } else {
451 warn!(
452 "int_variation called for a non-int flag {:?} (got {:?})",
453 flag_key, val
454 );
455 default
456 }
457 }
458
459 pub fn json_variation(
469 &self,
470 context: &Context,
471 flag_key: &str,
472 default: serde_json::Value,
473 ) -> serde_json::Value {
474 self.variation(context, flag_key, default.clone())
475 .as_json()
476 .unwrap_or(default)
477 }
478
479 pub fn bool_variation_detail(
486 &self,
487 context: &Context,
488 flag_key: &str,
489 default: bool,
490 ) -> Detail<bool> {
491 self.variation_detail(context, flag_key, default).try_map(
492 |val| val.as_bool(),
493 default,
494 eval::Error::WrongType,
495 )
496 }
497
498 pub fn str_variation_detail(
505 &self,
506 context: &Context,
507 flag_key: &str,
508 default: String,
509 ) -> Detail<String> {
510 self.variation_detail(context, flag_key, default.clone())
511 .try_map(|val| val.as_string(), default, eval::Error::WrongType)
512 }
513
514 pub fn float_variation_detail(
521 &self,
522 context: &Context,
523 flag_key: &str,
524 default: f64,
525 ) -> Detail<f64> {
526 self.variation_detail(context, flag_key, default).try_map(
527 |val| val.as_float(),
528 default,
529 eval::Error::WrongType,
530 )
531 }
532
533 pub fn int_variation_detail(
540 &self,
541 context: &Context,
542 flag_key: &str,
543 default: i64,
544 ) -> Detail<i64> {
545 self.variation_detail(context, flag_key, default).try_map(
546 |val| val.as_int(),
547 default,
548 eval::Error::WrongType,
549 )
550 }
551
552 pub fn json_variation_detail(
559 &self,
560 context: &Context,
561 flag_key: &str,
562 default: serde_json::Value,
563 ) -> Detail<serde_json::Value> {
564 self.variation_detail(context, flag_key, default.clone())
565 .try_map(|val| val.as_json(), default, eval::Error::WrongType)
566 }
567
568 pub fn secure_mode_hash(&self, context: &Context) -> String {
573 let key = aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
574 let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
575
576 data_encoding::HEXLOWER.encode(tag.as_ref())
577 }
578
579 pub fn all_flags_detail(
590 &self,
591 context: &Context,
592 flag_state_config: FlagDetailConfig,
593 ) -> FlagDetail {
594 if self.offline {
595 warn!(
596 "all_flags_detail() called, but client is in offline mode. Returning empty state"
597 );
598 return FlagDetail::new(false);
599 }
600
601 if !self.initialized() {
602 warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
603 return FlagDetail::new(false);
604 }
605
606 let data_store = self.data_store.read();
607
608 let mut flag_detail = FlagDetail::new(true);
609 flag_detail.populate(&*data_store, context, flag_state_config);
610
611 flag_detail
612 }
613
614 pub fn variation_detail<T: Into<FlagValue> + Clone>(
620 &self,
621 context: &Context,
622 flag_key: &str,
623 default: T,
624 ) -> Detail<FlagValue> {
625 let (detail, _) =
626 self.variation_internal(context, flag_key, default, &self.events_with_reasons);
627 detail
628 }
629
630 pub fn variation<T: Into<FlagValue> + Clone>(
641 &self,
642 context: &Context,
643 flag_key: &str,
644 default: T,
645 ) -> FlagValue {
646 let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
647 detail.value.unwrap()
648 }
649
650 pub fn migration_variation(
655 &self,
656 context: &Context,
657 flag_key: &str,
658 default_stage: Stage,
659 ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
660 let (detail, flag) =
661 self.variation_internal(context, flag_key, default_stage, &self.events_default);
662
663 let migration_detail =
664 detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
665 let tracker = MigrationOpTracker::new(
666 flag_key.into(),
667 flag,
668 context.clone(),
669 migration_detail.clone(),
670 default_stage,
671 );
672
673 (
674 migration_detail.value.unwrap_or(default_stage),
675 Arc::new(Mutex::new(tracker)),
676 )
677 }
678
679 pub fn track_event(&self, context: Context, key: impl Into<String>) {
689 let _ = self.track(context, key, None, serde_json::Value::Null);
690 }
691
692 pub fn track_data(
705 &self,
706 context: Context,
707 key: impl Into<String>,
708 data: impl Serialize,
709 ) -> serde_json::Result<()> {
710 self.track(context, key, None, data)
711 }
712
713 pub fn track_metric(
724 &self,
725 context: Context,
726 key: impl Into<String>,
727 value: f64,
728 data: impl Serialize,
729 ) {
730 let _ = self.track(context, key, Some(value), data);
731 }
732
733 fn track(
734 &self,
735 context: Context,
736 key: impl Into<String>,
737 metric_value: Option<f64>,
738 data: impl Serialize,
739 ) -> serde_json::Result<()> {
740 if !self.events_default.disabled {
741 let event =
742 self.events_default
743 .event_factory
744 .new_custom(context, key, metric_value, data)?;
745
746 self.send_internal(event);
747 }
748
749 Ok(())
750 }
751
752 pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
759 if self.events_default.disabled {
760 return;
761 }
762
763 match tracker.lock() {
764 Ok(tracker) => {
765 let event = tracker.build();
766 match event {
767 Ok(event) => {
768 self.send_internal(
769 self.events_default.event_factory.new_migration_op(event),
770 );
771 }
772 Err(e) => error!(
773 "Failed to build migration event, no event will be sent: {}",
774 e
775 ),
776 }
777 }
778 Err(e) => error!(
779 "Failed to lock migration tracker, no event will be sent: {}",
780 e
781 ),
782 }
783 }
784
785 fn variation_internal<T: Into<FlagValue> + Clone>(
786 &self,
787 context: &Context,
788 flag_key: &str,
789 default: T,
790 events_scope: &EventsScope,
791 ) -> (Detail<FlagValue>, Option<eval::Flag>) {
792 if self.offline {
793 return (
794 Detail::err_default(eval::Error::ClientNotReady, default.into()),
795 None,
796 );
797 }
798
799 let (flag, result) = match self.initialized() {
800 false => (
801 None,
802 Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
803 ),
804 true => {
805 let data_store = self.data_store.read();
806 match data_store.flag(flag_key) {
807 Some(flag) => {
808 let result = eval::evaluate(
809 data_store.to_store(),
810 &flag,
811 context,
812 Some(&*events_scope.prerequisite_event_recorder),
813 )
814 .map(|v| v.clone())
815 .or(default.clone().into());
816
817 (Some(flag), result)
818 }
819 None => (
820 None,
821 Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
822 ),
823 }
824 }
825 };
826
827 if !events_scope.disabled {
828 let event = match &flag {
829 Some(f) => events_scope.event_factory.new_eval_event(
830 flag_key,
831 context.clone(),
832 f,
833 result.clone(),
834 default.into(),
835 None,
836 ),
837 None => events_scope.event_factory.new_unknown_flag_event(
838 flag_key,
839 context.clone(),
840 result.clone(),
841 default.into(),
842 ),
843 };
844 self.send_internal(event);
845 }
846
847 (result, flag)
848 }
849
850 fn send_internal(&self, event: InputEvent) {
851 self.event_processor.send(event);
852 }
853}
854
855#[cfg(test)]
856mod tests {
857 use assert_json_diff::assert_json_eq;
858 use crossbeam_channel::Receiver;
859 use eval::{ContextBuilder, MultiContextBuilder};
860 use futures::FutureExt;
861 use hyper::client::HttpConnector;
862 use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
863 use maplit::hashmap;
864 use std::collections::HashMap;
865 use tokio::time::Instant;
866
867 use crate::data_source::MockDataSource;
868 use crate::data_source_builders::MockDataSourceBuilder;
869 use crate::events::create_event_sender;
870 use crate::events::event::{OutputEvent, VariationKey};
871 use crate::events::processor_builders::EventProcessorBuilder;
872 use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
873 use crate::stores::store_types::{PatchTarget, StorageItem};
874 use crate::test_common::{
875 self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
876 basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
877 };
878 use crate::{
879 AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
880 PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
881 SerializedItem,
882 };
883 use test_case::test_case;
884
885 use super::*;
886
887 fn is_send_and_sync<T: Send + Sync>() {}
888
889 #[test]
890 fn ensure_client_is_send_and_sync() {
891 is_send_and_sync::<Client>()
892 }
893
894 #[tokio::test]
895 async fn client_asynchronously_initializes() {
896 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
897 client.start_with_default_executor();
898
899 let now = Instant::now();
900 let initialized = client.initialized_async().await;
901 let elapsed_time = now.elapsed();
902 assert!(initialized);
903 assert!(elapsed_time.as_millis() > 500)
905 }
906
907 #[tokio::test]
908 async fn client_asynchronously_initializes_within_timeout() {
909 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
910 client.start_with_default_executor();
911
912 let now = Instant::now();
913 let initialized = client
914 .wait_for_initialization(Duration::from_millis(1500))
915 .await;
916 let elapsed_time = now.elapsed();
917 assert!(elapsed_time.as_millis() > 500);
919 assert_eq!(initialized, Some(true));
920 }
921
922 #[tokio::test]
923 async fn client_asynchronously_initializes_slower_than_timeout() {
924 let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
925 client.start_with_default_executor();
926
927 let now = Instant::now();
928 let initialized = client
929 .wait_for_initialization(Duration::from_millis(500))
930 .await;
931 let elapsed_time = now.elapsed();
932 assert!(elapsed_time.as_millis() < 750);
934 assert!(initialized.is_none());
935 }
936
937 #[tokio::test]
938 async fn client_initializes_immediately_in_offline_mode() {
939 let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
940 client.start_with_default_executor();
941
942 assert!(client.initialized());
943
944 let now = Instant::now();
945 let initialized = client
946 .wait_for_initialization(Duration::from_millis(2000))
947 .await;
948 let elapsed_time = now.elapsed();
949 assert_eq!(initialized, Some(true));
950 assert!(elapsed_time.as_millis() < 500)
951 }
952
953 #[tokio::test]
954 async fn client_initializes_immediately_in_daemon_mode() {
955 let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
956 client.start_with_default_executor();
957
958 assert!(client.initialized());
959
960 let now = Instant::now();
961 let initialized = client
962 .wait_for_initialization(Duration::from_millis(2000))
963 .await;
964 let elapsed_time = now.elapsed();
965 assert_eq!(initialized, Some(true));
966 assert!(elapsed_time.as_millis() < 500)
967 }
968
969 #[test_case(basic_flag("myFlag"), false.into(), true.into())]
970 #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
971 fn client_updates_changes_evaluation_results(
972 flag: eval::Flag,
973 default: FlagValue,
974 expected: FlagValue,
975 ) {
976 let context = ContextBuilder::new("foo")
977 .build()
978 .expect("Failed to create context");
979
980 let (client, _event_rx) = make_mocked_client();
981
982 let result = client.variation_detail(&context, "myFlag", default.clone());
983 assert_eq!(result.value.unwrap(), default);
984
985 client.start_with_default_executor();
986 client
987 .data_store
988 .write()
989 .upsert(
990 &flag.key,
991 PatchTarget::Flag(StorageItem::Item(flag.clone())),
992 )
993 .expect("patch should apply");
994
995 let result = client.variation_detail(&context, "myFlag", default);
996 assert_eq!(result.value.unwrap(), expected);
997 assert!(matches!(
998 result.reason,
999 Reason::Fallthrough {
1000 in_experiment: false
1001 }
1002 ));
1003 }
1004
1005 #[test]
1006 fn all_flags_detail_is_invalid_when_offline() {
1007 let (client, _event_rx) = make_mocked_offline_client();
1008 client.start_with_default_executor();
1009
1010 let context = ContextBuilder::new("bob")
1011 .build()
1012 .expect("Failed to create context");
1013
1014 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1015 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1016 }
1017
1018 #[test]
1019 fn all_flags_detail_is_invalid_when_not_initialized() {
1020 let (client, _event_rx) = make_mocked_client();
1021
1022 let context = ContextBuilder::new("bob")
1023 .build()
1024 .expect("Failed to create context");
1025
1026 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1027 assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1028 }
1029
1030 #[test]
1031 fn all_flags_detail_returns_flag_states() {
1032 let (client, _event_rx) = make_mocked_client();
1033 client.start_with_default_executor();
1034 client
1035 .data_store
1036 .write()
1037 .upsert(
1038 "myFlag1",
1039 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag1"))),
1040 )
1041 .expect("patch should apply");
1042 client
1043 .data_store
1044 .write()
1045 .upsert(
1046 "myFlag2",
1047 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag2"))),
1048 )
1049 .expect("patch should apply");
1050 let context = ContextBuilder::new("bob")
1051 .build()
1052 .expect("Failed to create context");
1053
1054 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1055
1056 client.close();
1057
1058 assert_json_eq!(
1059 all_flags,
1060 json!({
1061 "myFlag1": true,
1062 "myFlag2": true,
1063 "$flagsState": {
1064 "myFlag1": {
1065 "version": 42,
1066 "variation": 1
1067 },
1068 "myFlag2": {
1069 "version": 42,
1070 "variation": 1
1071 },
1072 },
1073 "$valid": true
1074 })
1075 );
1076 }
1077
1078 #[test]
1079 fn all_flags_detail_returns_prerequisite_relations() {
1080 let (client, _event_rx) = make_mocked_client();
1081 client.start_with_default_executor();
1082 client
1083 .data_store
1084 .write()
1085 .upsert(
1086 "prereq1",
1087 PatchTarget::Flag(StorageItem::Item(basic_flag("prereq1"))),
1088 )
1089 .expect("patch should apply");
1090 client
1091 .data_store
1092 .write()
1093 .upsert(
1094 "prereq2",
1095 PatchTarget::Flag(StorageItem::Item(basic_flag("prereq2"))),
1096 )
1097 .expect("patch should apply");
1098
1099 client
1100 .data_store
1101 .write()
1102 .upsert(
1103 "toplevel",
1104 PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1105 "toplevel",
1106 &["prereq1", "prereq2"],
1107 false,
1108 ))),
1109 )
1110 .expect("patch should apply");
1111
1112 let context = ContextBuilder::new("bob")
1113 .build()
1114 .expect("Failed to create context");
1115
1116 let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1117
1118 client.close();
1119
1120 assert_json_eq!(
1121 all_flags,
1122 json!({
1123 "prereq1": true,
1124 "prereq2": true,
1125 "toplevel": true,
1126 "$flagsState": {
1127 "toplevel": {
1128 "version": 42,
1129 "variation": 1,
1130 "prerequisites": ["prereq1", "prereq2"]
1131 },
1132 "prereq1": {
1133 "version": 42,
1134 "variation": 1
1135 },
1136 "prereq2": {
1137 "version": 42,
1138 "variation": 1
1139 },
1140 },
1141 "$valid": true
1142 })
1143 );
1144 }
1145
1146 #[test]
1147 fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1148 let (client, _event_rx) = make_mocked_client();
1149 client.start_with_default_executor();
1150 client
1151 .data_store
1152 .write()
1153 .upsert(
1154 "prereq1",
1155 PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1156 "prereq1", false,
1157 ))),
1158 )
1159 .expect("patch should apply");
1160 client
1161 .data_store
1162 .write()
1163 .upsert(
1164 "prereq2",
1165 PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1166 "prereq2", false,
1167 ))),
1168 )
1169 .expect("patch should apply");
1170
1171 client
1172 .data_store
1173 .write()
1174 .upsert(
1175 "toplevel",
1176 PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1177 "toplevel",
1178 &["prereq1", "prereq2"],
1179 true,
1180 ))),
1181 )
1182 .expect("patch should apply");
1183
1184 let context = ContextBuilder::new("bob")
1185 .build()
1186 .expect("Failed to create context");
1187
1188 let mut config = FlagDetailConfig::new();
1189 config.client_side_only();
1190
1191 let all_flags = client.all_flags_detail(&context, config);
1192
1193 client.close();
1194
1195 assert_json_eq!(
1196 all_flags,
1197 json!({
1198 "toplevel": true,
1199 "$flagsState": {
1200 "toplevel": {
1201 "version": 42,
1202 "variation": 1,
1203 "prerequisites": ["prereq1", "prereq2"]
1204 },
1205 },
1206 "$valid": true
1207 })
1208 );
1209 }
1210
1211 #[test]
1212 fn variation_tracks_events_correctly() {
1213 let (client, event_rx) = make_mocked_client();
1214 client.start_with_default_executor();
1215 client
1216 .data_store
1217 .write()
1218 .upsert(
1219 "myFlag",
1220 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1221 )
1222 .expect("patch should apply");
1223 let context = ContextBuilder::new("bob")
1224 .build()
1225 .expect("Failed to create context");
1226
1227 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1228
1229 assert!(flag_value.as_bool().unwrap());
1230 client.flush();
1231 client.close();
1232
1233 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1234 assert_eq!(events.len(), 2);
1235 assert_eq!(events[0].kind(), "index");
1236 assert_eq!(events[1].kind(), "summary");
1237
1238 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1239 let variation_key = VariationKey {
1240 version: Some(42),
1241 variation: Some(1),
1242 };
1243 let feature = event_summary.features.get("myFlag");
1244 assert!(feature.is_some());
1245
1246 let feature = feature.unwrap();
1247 assert!(feature.counters.contains_key(&variation_key));
1248 } else {
1249 panic!("Event should be a summary type");
1250 }
1251 }
1252
1253 #[test]
1254 fn variation_handles_offline_mode() {
1255 let (client, event_rx) = make_mocked_offline_client();
1256 client.start_with_default_executor();
1257
1258 let context = ContextBuilder::new("bob")
1259 .build()
1260 .expect("Failed to create context");
1261 let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1262
1263 assert!(!flag_value.as_bool().unwrap());
1264 client.flush();
1265 client.close();
1266
1267 assert_eq!(event_rx.iter().count(), 0);
1268 }
1269
1270 #[test]
1271 fn variation_handles_unknown_flags() {
1272 let (client, event_rx) = make_mocked_client();
1273 client.start_with_default_executor();
1274 let context = ContextBuilder::new("bob")
1275 .build()
1276 .expect("Failed to create context");
1277
1278 let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1279
1280 assert!(!flag_value.as_bool().unwrap());
1281 client.flush();
1282 client.close();
1283
1284 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1285 assert_eq!(events.len(), 2);
1286 assert_eq!(events[0].kind(), "index");
1287 assert_eq!(events[1].kind(), "summary");
1288
1289 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1290 let variation_key = VariationKey {
1291 version: None,
1292 variation: None,
1293 };
1294
1295 let feature = event_summary.features.get("non-existent-flag");
1296 assert!(feature.is_some());
1297
1298 let feature = feature.unwrap();
1299 assert!(feature.counters.contains_key(&variation_key));
1300 } else {
1301 panic!("Event should be a summary type");
1302 }
1303 }
1304
1305 #[test]
1306 fn variation_detail_handles_debug_events_correctly() {
1307 let (client, event_rx) = make_mocked_client();
1308 client.start_with_default_executor();
1309
1310 let mut flag = basic_flag("myFlag");
1311 flag.debug_events_until_date = Some(64_060_606_800_000); client
1314 .data_store
1315 .write()
1316 .upsert(
1317 &flag.key,
1318 PatchTarget::Flag(StorageItem::Item(flag.clone())),
1319 )
1320 .expect("patch should apply");
1321 let context = ContextBuilder::new("bob")
1322 .build()
1323 .expect("Failed to create context");
1324
1325 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1326
1327 assert!(detail.value.unwrap().as_bool().unwrap());
1328 assert!(matches!(
1329 detail.reason,
1330 Reason::Fallthrough {
1331 in_experiment: false
1332 }
1333 ));
1334 client.flush();
1335 client.close();
1336
1337 let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1338 assert_eq!(events.len(), 3);
1339 assert_eq!(events[0].kind(), "index");
1340 assert_eq!(events[1].kind(), "debug");
1341 assert_eq!(events[2].kind(), "summary");
1342
1343 if let OutputEvent::Summary(event_summary) = events[2].clone() {
1344 let variation_key = VariationKey {
1345 version: Some(42),
1346 variation: Some(1),
1347 };
1348
1349 let feature = event_summary.features.get("myFlag");
1350 assert!(feature.is_some());
1351
1352 let feature = feature.unwrap();
1353 assert!(feature.counters.contains_key(&variation_key));
1354 } else {
1355 panic!("Event should be a summary type");
1356 }
1357 }
1358
1359 #[test]
1360 fn variation_detail_tracks_events_correctly() {
1361 let (client, event_rx) = make_mocked_client();
1362 client.start_with_default_executor();
1363
1364 client
1365 .data_store
1366 .write()
1367 .upsert(
1368 "myFlag",
1369 PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1370 )
1371 .expect("patch should apply");
1372 let context = ContextBuilder::new("bob")
1373 .build()
1374 .expect("Failed to create context");
1375
1376 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1377
1378 assert!(detail.value.unwrap().as_bool().unwrap());
1379 assert!(matches!(
1380 detail.reason,
1381 Reason::Fallthrough {
1382 in_experiment: false
1383 }
1384 ));
1385 client.flush();
1386 client.close();
1387
1388 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1389 assert_eq!(events.len(), 2);
1390 assert_eq!(events[0].kind(), "index");
1391 assert_eq!(events[1].kind(), "summary");
1392
1393 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1394 let variation_key = VariationKey {
1395 version: Some(42),
1396 variation: Some(1),
1397 };
1398
1399 let feature = event_summary.features.get("myFlag");
1400 assert!(feature.is_some());
1401
1402 let feature = feature.unwrap();
1403 assert!(feature.counters.contains_key(&variation_key));
1404 } else {
1405 panic!("Event should be a summary type");
1406 }
1407 }
1408
1409 #[test]
1410 fn variation_detail_handles_offline_mode() {
1411 let (client, event_rx) = make_mocked_offline_client();
1412 client.start_with_default_executor();
1413
1414 let context = ContextBuilder::new("bob")
1415 .build()
1416 .expect("Failed to create context");
1417
1418 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1419
1420 assert!(!detail.value.unwrap().as_bool().unwrap());
1421 assert!(matches!(
1422 detail.reason,
1423 Reason::Error {
1424 error: eval::Error::ClientNotReady
1425 }
1426 ));
1427 client.flush();
1428 client.close();
1429
1430 assert_eq!(event_rx.iter().count(), 0);
1431 }
1432
1433 struct InMemoryPersistentDataStoreFactory {
1434 data: AllData<Flag, Segment>,
1435 initialized: bool,
1436 }
1437
1438 impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1439 fn create_persistent_data_store(
1440 &self,
1441 ) -> Result<Box<(dyn PersistentDataStore + 'static)>, std::io::Error> {
1442 let serialized_data =
1443 AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1444 Ok(Box::new(InMemoryPersistentDataStore {
1445 data: serialized_data,
1446 initialized: self.initialized,
1447 }))
1448 }
1449 }
1450
1451 #[test]
1452 fn variation_detail_handles_daemon_mode() {
1453 testing_logger::setup();
1454 let factory = InMemoryPersistentDataStoreFactory {
1455 data: AllData {
1456 flags: hashmap!["flag".into() => basic_flag("flag")],
1457 segments: HashMap::new(),
1458 },
1459 initialized: true,
1460 };
1461 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1462
1463 let config = ConfigBuilder::new("sdk-key")
1464 .daemon_mode(true)
1465 .data_store(&builder)
1466 .event_processor(&NullEventProcessorBuilder::new())
1467 .build()
1468 .expect("config should build");
1469
1470 let client = Client::build(config).expect("Should be built.");
1471
1472 client.start_with_default_executor();
1473
1474 let context = ContextBuilder::new("bob")
1475 .build()
1476 .expect("Failed to create context");
1477
1478 let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1479
1480 assert!(detail.value.unwrap().as_bool().unwrap());
1481 assert!(matches!(
1482 detail.reason,
1483 Reason::Fallthrough {
1484 in_experiment: false
1485 }
1486 ));
1487 client.flush();
1488 client.close();
1489
1490 testing_logger::validate(|captured_logs| {
1491 assert_eq!(captured_logs.len(), 1);
1492 assert_eq!(
1493 captured_logs[0].body,
1494 "Started LaunchDarkly Client in daemon mode"
1495 );
1496 });
1497 }
1498
1499 #[test]
1500 fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1501 testing_logger::setup();
1502
1503 let factory = InMemoryPersistentDataStoreFactory {
1504 data: AllData {
1505 flags: HashMap::new(),
1506 segments: HashMap::new(),
1507 },
1508 initialized: false,
1509 };
1510 let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1511
1512 let config = ConfigBuilder::new("sdk-key")
1513 .daemon_mode(true)
1514 .data_store(&builder)
1515 .event_processor(&NullEventProcessorBuilder::new())
1516 .build()
1517 .expect("config should build");
1518
1519 let client = Client::build(config).expect("Should be built.");
1520
1521 client.start_with_default_executor();
1522
1523 let context = ContextBuilder::new("bob")
1524 .build()
1525 .expect("Failed to create context");
1526
1527 client.variation_detail(&context, "flag", FlagValue::Bool(false));
1528
1529 testing_logger::validate(|captured_logs| {
1530 assert_eq!(captured_logs.len(), 1);
1531 assert_eq!(
1532 captured_logs[0].body,
1533 "Started LaunchDarkly Client in daemon mode"
1534 );
1535 });
1536 }
1537
1538 #[test]
1539 fn variation_handles_off_flag_without_variation() {
1540 let (client, event_rx) = make_mocked_client();
1541 client.start_with_default_executor();
1542
1543 client
1544 .data_store
1545 .write()
1546 .upsert(
1547 "myFlag",
1548 PatchTarget::Flag(StorageItem::Item(basic_off_flag("myFlag"))),
1549 )
1550 .expect("patch should apply");
1551 let context = ContextBuilder::new("bob")
1552 .build()
1553 .expect("Failed to create context");
1554
1555 let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1556
1557 assert!(!result.as_bool().unwrap());
1558 client.flush();
1559 client.close();
1560
1561 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1562 assert_eq!(events.len(), 2);
1563 assert_eq!(events[0].kind(), "index");
1564 assert_eq!(events[1].kind(), "summary");
1565
1566 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1567 let variation_key = VariationKey {
1568 version: Some(42),
1569 variation: None,
1570 };
1571 let feature = event_summary.features.get("myFlag");
1572 assert!(feature.is_some());
1573
1574 let feature = feature.unwrap();
1575 assert!(feature.counters.contains_key(&variation_key));
1576 } else {
1577 panic!("Event should be a summary type");
1578 }
1579 }
1580
1581 #[test]
1582 fn variation_detail_tracks_prereq_events_correctly() {
1583 let (client, event_rx) = make_mocked_client();
1584 client.start_with_default_executor();
1585
1586 let mut basic_preqreq_flag = basic_flag("prereqFlag");
1587 basic_preqreq_flag.track_events = true;
1588
1589 client
1590 .data_store
1591 .write()
1592 .upsert(
1593 "prereqFlag",
1594 PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1595 )
1596 .expect("patch should apply");
1597
1598 let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1599 basic_flag.track_events = true;
1600 client
1601 .data_store
1602 .write()
1603 .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1604 .expect("patch should apply");
1605 let context = ContextBuilder::new("bob")
1606 .build()
1607 .expect("Failed to create context");
1608
1609 let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1610
1611 assert!(detail.value.unwrap().as_bool().unwrap());
1612 assert!(matches!(
1613 detail.reason,
1614 Reason::Fallthrough {
1615 in_experiment: false
1616 }
1617 ));
1618 client.flush();
1619 client.close();
1620
1621 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1622 assert_eq!(events.len(), 4);
1623 assert_eq!(events[0].kind(), "index");
1624 assert_eq!(events[1].kind(), "feature");
1625 assert_eq!(events[2].kind(), "feature");
1626 assert_eq!(events[3].kind(), "summary");
1627
1628 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1629 let variation_key = VariationKey {
1630 version: Some(42),
1631 variation: Some(1),
1632 };
1633 let feature = event_summary.features.get("myFlag");
1634 assert!(feature.is_some());
1635
1636 let feature = feature.unwrap();
1637 assert!(feature.counters.contains_key(&variation_key));
1638
1639 let variation_key = VariationKey {
1640 version: Some(42),
1641 variation: Some(1),
1642 };
1643 let feature = event_summary.features.get("prereqFlag");
1644 assert!(feature.is_some());
1645
1646 let feature = feature.unwrap();
1647 assert!(feature.counters.contains_key(&variation_key));
1648 }
1649 }
1650
1651 #[test]
1652 fn variation_handles_failed_prereqs_correctly() {
1653 let (client, event_rx) = make_mocked_client();
1654 client.start_with_default_executor();
1655
1656 let mut basic_preqreq_flag = basic_off_flag("prereqFlag");
1657 basic_preqreq_flag.track_events = true;
1658
1659 client
1660 .data_store
1661 .write()
1662 .upsert(
1663 "prereqFlag",
1664 PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1665 )
1666 .expect("patch should apply");
1667
1668 let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1669 basic_flag.track_events = true;
1670 client
1671 .data_store
1672 .write()
1673 .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1674 .expect("patch should apply");
1675 let context = ContextBuilder::new("bob")
1676 .build()
1677 .expect("Failed to create context");
1678
1679 let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1680
1681 assert!(!detail.as_bool().unwrap());
1682 client.flush();
1683 client.close();
1684
1685 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1686 assert_eq!(events.len(), 4);
1687 assert_eq!(events[0].kind(), "index");
1688 assert_eq!(events[1].kind(), "feature");
1689 assert_eq!(events[2].kind(), "feature");
1690 assert_eq!(events[3].kind(), "summary");
1691
1692 if let OutputEvent::Summary(event_summary) = events[3].clone() {
1693 let variation_key = VariationKey {
1694 version: Some(42),
1695 variation: Some(0),
1696 };
1697 let feature = event_summary.features.get("myFlag");
1698 assert!(feature.is_some());
1699
1700 let feature = feature.unwrap();
1701 assert!(feature.counters.contains_key(&variation_key));
1702
1703 let variation_key = VariationKey {
1704 version: Some(42),
1705 variation: None,
1706 };
1707 let feature = event_summary.features.get("prereqFlag");
1708 assert!(feature.is_some());
1709
1710 let feature = feature.unwrap();
1711 assert!(feature.counters.contains_key(&variation_key));
1712 }
1713 }
1714
1715 #[test]
1716 fn variation_detail_handles_flag_not_found() {
1717 let (client, event_rx) = make_mocked_client();
1718 client.start_with_default_executor();
1719
1720 let context = ContextBuilder::new("bob")
1721 .build()
1722 .expect("Failed to create context");
1723 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1724
1725 assert!(!detail.value.unwrap().as_bool().unwrap());
1726 assert!(matches!(
1727 detail.reason,
1728 Reason::Error {
1729 error: eval::Error::FlagNotFound
1730 }
1731 ));
1732 client.flush();
1733 client.close();
1734
1735 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1736 assert_eq!(events.len(), 2);
1737 assert_eq!(events[0].kind(), "index");
1738 assert_eq!(events[1].kind(), "summary");
1739
1740 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1741 let variation_key = VariationKey {
1742 version: None,
1743 variation: None,
1744 };
1745 let feature = event_summary.features.get("non-existent-flag");
1746 assert!(feature.is_some());
1747
1748 let feature = feature.unwrap();
1749 assert!(feature.counters.contains_key(&variation_key));
1750 } else {
1751 panic!("Event should be a summary type");
1752 }
1753 }
1754
1755 #[tokio::test]
1756 async fn variation_detail_handles_client_not_ready() {
1757 let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1758 client.start_with_default_executor();
1759 let context = ContextBuilder::new("bob")
1760 .build()
1761 .expect("Failed to create context");
1762
1763 let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1764
1765 assert!(!detail.value.unwrap().as_bool().unwrap());
1766 assert!(matches!(
1767 detail.reason,
1768 Reason::Error {
1769 error: eval::Error::ClientNotReady
1770 }
1771 ));
1772 client.flush();
1773 client.close();
1774
1775 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1776 assert_eq!(events.len(), 2);
1777 assert_eq!(events[0].kind(), "index");
1778 assert_eq!(events[1].kind(), "summary");
1779
1780 if let OutputEvent::Summary(event_summary) = events[1].clone() {
1781 let variation_key = VariationKey {
1782 version: None,
1783 variation: None,
1784 };
1785 let feature = event_summary.features.get("non-existent-flag");
1786 assert!(feature.is_some());
1787
1788 let feature = feature.unwrap();
1789 assert!(feature.counters.contains_key(&variation_key));
1790 } else {
1791 panic!("Event should be a summary type");
1792 }
1793 }
1794
1795 #[test]
1796 fn identify_sends_identify_event() {
1797 let (client, event_rx) = make_mocked_client();
1798 client.start_with_default_executor();
1799
1800 let context = ContextBuilder::new("bob")
1801 .build()
1802 .expect("Failed to create context");
1803
1804 client.identify(context);
1805 client.flush();
1806 client.close();
1807
1808 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1809 assert_eq!(events.len(), 1);
1810 assert_eq!(events[0].kind(), "identify");
1811 }
1812
1813 #[test]
1814 fn identify_sends_sends_nothing_in_offline_mode() {
1815 let (client, event_rx) = make_mocked_offline_client();
1816 client.start_with_default_executor();
1817
1818 let context = ContextBuilder::new("bob")
1819 .build()
1820 .expect("Failed to create context");
1821
1822 client.identify(context);
1823 client.flush();
1824 client.close();
1825
1826 assert_eq!(event_rx.iter().count(), 0);
1827 }
1828
1829 #[test]
1830 fn secure_mode_hash() {
1831 let config = ConfigBuilder::new("secret")
1832 .offline(true)
1833 .build()
1834 .expect("config should build");
1835 let client = Client::build(config).expect("Should be built.");
1836 let context = ContextBuilder::new("Message")
1837 .build()
1838 .expect("Failed to create context");
1839
1840 assert_eq!(
1841 client.secure_mode_hash(&context),
1842 "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1843 );
1844 }
1845
1846 #[test]
1847 fn secure_mode_hash_with_multi_kind() {
1848 let config = ConfigBuilder::new("secret")
1849 .offline(true)
1850 .build()
1851 .expect("config should build");
1852 let client = Client::build(config).expect("Should be built.");
1853
1854 let org = ContextBuilder::new("org-key|1")
1855 .kind("org")
1856 .build()
1857 .expect("Failed to create context");
1858 let user = ContextBuilder::new("user-key:2")
1859 .build()
1860 .expect("Failed to create context");
1861
1862 let context = MultiContextBuilder::new()
1863 .add_context(org)
1864 .add_context(user)
1865 .build()
1866 .expect("failed to build multi-context");
1867
1868 assert_eq!(
1869 client.secure_mode_hash(&context),
1870 "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1871 );
1872 }
1873
1874 #[derive(Serialize)]
1875 struct MyCustomData {
1876 pub answer: u32,
1877 }
1878
1879 #[test]
1880 fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1881 let (client, event_rx) = make_mocked_client();
1882 client.start_with_default_executor();
1883
1884 let context = ContextBuilder::new("bob")
1885 .build()
1886 .expect("Failed to create context");
1887
1888 client.track_event(context.clone(), "event-with-null");
1889 client.track_data(context.clone(), "event-with-string", "string-data")?;
1890 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1891 client.track_data(
1892 context.clone(),
1893 "event-with-struct",
1894 MyCustomData { answer: 42 },
1895 )?;
1896 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1897
1898 client.flush();
1899 client.close();
1900
1901 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1902 assert_eq!(events.len(), 6);
1903
1904 let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1905 for event in events {
1906 if let Some(count) = events_by_type.get_mut(event.kind()) {
1907 *count += 1;
1908 } else {
1909 events_by_type.insert(event.kind(), 1);
1910 }
1911 }
1912 assert!(matches!(events_by_type.get("index"), Some(1)));
1913 assert!(matches!(events_by_type.get("custom"), Some(5)));
1914
1915 Ok(())
1916 }
1917
1918 #[test]
1919 fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1920 let (client, event_rx) = make_mocked_offline_client();
1921 client.start_with_default_executor();
1922
1923 let context = ContextBuilder::new("bob")
1924 .build()
1925 .expect("Failed to create context");
1926
1927 client.track_event(context.clone(), "event-with-null");
1928 client.track_data(context.clone(), "event-with-string", "string-data")?;
1929 client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1930 client.track_data(
1931 context.clone(),
1932 "event-with-struct",
1933 MyCustomData { answer: 42 },
1934 )?;
1935 client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1936
1937 client.flush();
1938 client.close();
1939
1940 assert_eq!(event_rx.iter().count(), 0);
1941
1942 Ok(())
1943 }
1944
1945 #[test]
1946 fn migration_handles_flag_not_found() {
1947 let (client, _event_rx) = make_mocked_client();
1948 client.start_with_default_executor();
1949
1950 let context = ContextBuilder::new("bob")
1951 .build()
1952 .expect("Failed to create context");
1953
1954 let (stage, _tracker) =
1955 client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1956
1957 assert_eq!(stage, Stage::Off);
1958 }
1959
1960 #[test]
1961 fn migration_uses_non_migration_flag() {
1962 let (client, _event_rx) = make_mocked_client();
1963 client.start_with_default_executor();
1964 client
1965 .data_store
1966 .write()
1967 .upsert(
1968 "boolean-flag",
1969 PatchTarget::Flag(StorageItem::Item(basic_flag("boolean-flag"))),
1970 )
1971 .expect("patch should apply");
1972
1973 let context = ContextBuilder::new("bob")
1974 .build()
1975 .expect("Failed to create context");
1976
1977 let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1978
1979 assert_eq!(stage, Stage::Off);
1980 }
1981
1982 #[test_case(Stage::Off)]
1983 #[test_case(Stage::DualWrite)]
1984 #[test_case(Stage::Shadow)]
1985 #[test_case(Stage::Live)]
1986 #[test_case(Stage::Rampdown)]
1987 #[test_case(Stage::Complete)]
1988 fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1989 let (client, _event_rx) = make_mocked_client();
1990 client.start_with_default_executor();
1991 client
1992 .data_store
1993 .write()
1994 .upsert(
1995 "stage-flag",
1996 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
1997 )
1998 .expect("patch should apply");
1999
2000 let context = ContextBuilder::new("bob")
2001 .build()
2002 .expect("Failed to create context");
2003
2004 let (evaluated_stage, _tracker) =
2005 client.migration_variation(&context, "stage-flag", Stage::Off);
2006
2007 assert_eq!(evaluated_stage, stage);
2008 }
2009
2010 #[tokio::test]
2011 async fn migration_tracks_invoked_correctly() {
2012 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
2013 .await;
2014 migration_tracks_invoked_correctly_driver(
2015 Stage::DualWrite,
2016 Operation::Read,
2017 vec![Origin::Old],
2018 )
2019 .await;
2020 migration_tracks_invoked_correctly_driver(
2021 Stage::Shadow,
2022 Operation::Read,
2023 vec![Origin::Old, Origin::New],
2024 )
2025 .await;
2026 migration_tracks_invoked_correctly_driver(
2027 Stage::Live,
2028 Operation::Read,
2029 vec![Origin::Old, Origin::New],
2030 )
2031 .await;
2032 migration_tracks_invoked_correctly_driver(
2033 Stage::Rampdown,
2034 Operation::Read,
2035 vec![Origin::New],
2036 )
2037 .await;
2038 migration_tracks_invoked_correctly_driver(
2039 Stage::Complete,
2040 Operation::Read,
2041 vec![Origin::New],
2042 )
2043 .await;
2044 migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
2045 .await;
2046 migration_tracks_invoked_correctly_driver(
2047 Stage::DualWrite,
2048 Operation::Write,
2049 vec![Origin::Old, Origin::New],
2050 )
2051 .await;
2052 migration_tracks_invoked_correctly_driver(
2053 Stage::Shadow,
2054 Operation::Write,
2055 vec![Origin::Old, Origin::New],
2056 )
2057 .await;
2058 migration_tracks_invoked_correctly_driver(
2059 Stage::Live,
2060 Operation::Write,
2061 vec![Origin::Old, Origin::New],
2062 )
2063 .await;
2064 migration_tracks_invoked_correctly_driver(
2065 Stage::Rampdown,
2066 Operation::Write,
2067 vec![Origin::Old, Origin::New],
2068 )
2069 .await;
2070 migration_tracks_invoked_correctly_driver(
2071 Stage::Complete,
2072 Operation::Write,
2073 vec![Origin::New],
2074 )
2075 .await;
2076 }
2077
2078 async fn migration_tracks_invoked_correctly_driver(
2079 stage: Stage,
2080 operation: Operation,
2081 origins: Vec<Origin>,
2082 ) {
2083 let (client, event_rx) = make_mocked_client();
2084 let client = Arc::new(client);
2085 client.start_with_default_executor();
2086 client
2087 .data_store
2088 .write()
2089 .upsert(
2090 "stage-flag",
2091 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2092 )
2093 .expect("patch should apply");
2094
2095 let mut migrator = MigratorBuilder::new(client.clone())
2096 .read(
2097 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2098 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2099 Some(|_, _| true),
2100 )
2101 .write(
2102 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2103 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2104 )
2105 .build()
2106 .expect("migrator should build");
2107
2108 let context = ContextBuilder::new("bob")
2109 .build()
2110 .expect("Failed to create context");
2111
2112 if let Operation::Read = operation {
2113 migrator
2114 .read(
2115 &context,
2116 "stage-flag".into(),
2117 Stage::Off,
2118 serde_json::Value::Null,
2119 )
2120 .await;
2121 } else {
2122 migrator
2123 .write(
2124 &context,
2125 "stage-flag".into(),
2126 Stage::Off,
2127 serde_json::Value::Null,
2128 )
2129 .await;
2130 }
2131
2132 client.flush();
2133 client.close();
2134
2135 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2136 assert_eq!(events.len(), 3);
2137 match &events[1] {
2138 OutputEvent::MigrationOp(event) => {
2139 assert!(event.invoked.len() == origins.len());
2140 assert!(event.invoked.iter().all(|i| origins.contains(i)));
2141 }
2142 _ => panic!("Expected migration event"),
2143 }
2144 }
2145
2146 #[tokio::test]
2147 async fn migration_tracks_latency() {
2148 migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2149 migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2150 migration_tracks_latency_driver(
2151 Stage::Shadow,
2152 Operation::Read,
2153 vec![Origin::Old, Origin::New],
2154 )
2155 .await;
2156 migration_tracks_latency_driver(
2157 Stage::Live,
2158 Operation::Read,
2159 vec![Origin::Old, Origin::New],
2160 )
2161 .await;
2162 migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2163 migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2164 migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2165 migration_tracks_latency_driver(
2166 Stage::DualWrite,
2167 Operation::Write,
2168 vec![Origin::Old, Origin::New],
2169 )
2170 .await;
2171 migration_tracks_latency_driver(
2172 Stage::Shadow,
2173 Operation::Write,
2174 vec![Origin::Old, Origin::New],
2175 )
2176 .await;
2177 migration_tracks_latency_driver(
2178 Stage::Live,
2179 Operation::Write,
2180 vec![Origin::Old, Origin::New],
2181 )
2182 .await;
2183 migration_tracks_latency_driver(
2184 Stage::Rampdown,
2185 Operation::Write,
2186 vec![Origin::Old, Origin::New],
2187 )
2188 .await;
2189 migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2190 }
2191
2192 async fn migration_tracks_latency_driver(
2193 stage: Stage,
2194 operation: Operation,
2195 origins: Vec<Origin>,
2196 ) {
2197 let (client, event_rx) = make_mocked_client();
2198 let client = Arc::new(client);
2199 client.start_with_default_executor();
2200 client
2201 .data_store
2202 .write()
2203 .upsert(
2204 "stage-flag",
2205 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2206 )
2207 .expect("patch should apply");
2208
2209 let mut migrator = MigratorBuilder::new(client.clone())
2210 .track_latency(true)
2211 .read(
2212 |_| {
2213 async move {
2214 async_std::task::sleep(Duration::from_millis(100)).await;
2215 Ok(serde_json::Value::Null)
2216 }
2217 .boxed()
2218 },
2219 |_| {
2220 async move {
2221 async_std::task::sleep(Duration::from_millis(100)).await;
2222 Ok(serde_json::Value::Null)
2223 }
2224 .boxed()
2225 },
2226 Some(|_, _| true),
2227 )
2228 .write(
2229 |_| {
2230 async move {
2231 async_std::task::sleep(Duration::from_millis(100)).await;
2232 Ok(serde_json::Value::Null)
2233 }
2234 .boxed()
2235 },
2236 |_| {
2237 async move {
2238 async_std::task::sleep(Duration::from_millis(100)).await;
2239 Ok(serde_json::Value::Null)
2240 }
2241 .boxed()
2242 },
2243 )
2244 .build()
2245 .expect("migrator should build");
2246
2247 let context = ContextBuilder::new("bob")
2248 .build()
2249 .expect("Failed to create context");
2250
2251 if let Operation::Read = operation {
2252 migrator
2253 .read(
2254 &context,
2255 "stage-flag".into(),
2256 Stage::Off,
2257 serde_json::Value::Null,
2258 )
2259 .await;
2260 } else {
2261 migrator
2262 .write(
2263 &context,
2264 "stage-flag".into(),
2265 Stage::Off,
2266 serde_json::Value::Null,
2267 )
2268 .await;
2269 }
2270
2271 client.flush();
2272 client.close();
2273
2274 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2275 assert_eq!(events.len(), 3);
2276 match &events[1] {
2277 OutputEvent::MigrationOp(event) => {
2278 assert!(event.latency.len() == origins.len());
2279 assert!(event
2280 .latency
2281 .values()
2282 .all(|l| l > &Duration::from_millis(100)));
2283 }
2284 _ => panic!("Expected migration event"),
2285 }
2286 }
2287
2288 #[tokio::test]
2289 async fn migration_tracks_read_errors() {
2290 migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2291 migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2292 migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2293 migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2294 migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2295 migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2296 }
2297
2298 async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2299 let (client, event_rx) = make_mocked_client();
2300 let client = Arc::new(client);
2301 client.start_with_default_executor();
2302 client
2303 .data_store
2304 .write()
2305 .upsert(
2306 "stage-flag",
2307 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2308 )
2309 .expect("patch should apply");
2310
2311 let mut migrator = MigratorBuilder::new(client.clone())
2312 .track_latency(true)
2313 .read(
2314 |_| async move { Err("fail".into()) }.boxed(),
2315 |_| async move { Err("fail".into()) }.boxed(),
2316 Some(|_: &String, _: &String| true),
2317 )
2318 .write(
2319 |_| async move { Err("fail".into()) }.boxed(),
2320 |_| async move { Err("fail".into()) }.boxed(),
2321 )
2322 .build()
2323 .expect("migrator should build");
2324
2325 let context = ContextBuilder::new("bob")
2326 .build()
2327 .expect("Failed to create context");
2328
2329 migrator
2330 .read(
2331 &context,
2332 "stage-flag".into(),
2333 Stage::Off,
2334 serde_json::Value::Null,
2335 )
2336 .await;
2337 client.flush();
2338 client.close();
2339
2340 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2341 assert_eq!(events.len(), 3);
2342 match &events[1] {
2343 OutputEvent::MigrationOp(event) => {
2344 assert!(event.errors.len() == origins.len());
2345 assert!(event.errors.iter().all(|i| origins.contains(i)));
2346 }
2347 _ => panic!("Expected migration event"),
2348 }
2349 }
2350
2351 #[tokio::test]
2352 async fn migration_tracks_authoritative_write_errors() {
2353 migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2354 migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2355 .await;
2356 migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2357 migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2358 migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2359 .await;
2360 migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2361 .await;
2362 }
2363
2364 async fn migration_tracks_authoritative_write_errors_driver(
2365 stage: Stage,
2366 origins: Vec<Origin>,
2367 ) {
2368 let (client, event_rx) = make_mocked_client();
2369 let client = Arc::new(client);
2370 client.start_with_default_executor();
2371 client
2372 .data_store
2373 .write()
2374 .upsert(
2375 "stage-flag",
2376 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2377 )
2378 .expect("patch should apply");
2379
2380 let mut migrator = MigratorBuilder::new(client.clone())
2381 .track_latency(true)
2382 .read(
2383 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2384 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2385 None,
2386 )
2387 .write(
2388 |_| async move { Err("fail".into()) }.boxed(),
2389 |_| async move { Err("fail".into()) }.boxed(),
2390 )
2391 .build()
2392 .expect("migrator should build");
2393
2394 let context = ContextBuilder::new("bob")
2395 .build()
2396 .expect("Failed to create context");
2397
2398 migrator
2399 .write(
2400 &context,
2401 "stage-flag".into(),
2402 Stage::Off,
2403 serde_json::Value::Null,
2404 )
2405 .await;
2406
2407 client.flush();
2408 client.close();
2409
2410 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2411 assert_eq!(events.len(), 3);
2412 match &events[1] {
2413 OutputEvent::MigrationOp(event) => {
2414 assert!(event.errors.len() == origins.len());
2415 assert!(event.errors.iter().all(|i| origins.contains(i)));
2416 }
2417 _ => panic!("Expected migration event"),
2418 }
2419 }
2420
2421 #[tokio::test]
2422 async fn migration_tracks_nonauthoritative_write_errors() {
2423 migration_tracks_nonauthoritative_write_errors_driver(
2424 Stage::DualWrite,
2425 false,
2426 true,
2427 vec![Origin::New],
2428 )
2429 .await;
2430 migration_tracks_nonauthoritative_write_errors_driver(
2431 Stage::Shadow,
2432 false,
2433 true,
2434 vec![Origin::New],
2435 )
2436 .await;
2437 migration_tracks_nonauthoritative_write_errors_driver(
2438 Stage::Live,
2439 true,
2440 false,
2441 vec![Origin::Old],
2442 )
2443 .await;
2444 migration_tracks_nonauthoritative_write_errors_driver(
2445 Stage::Rampdown,
2446 true,
2447 false,
2448 vec![Origin::Old],
2449 )
2450 .await;
2451 }
2452
2453 async fn migration_tracks_nonauthoritative_write_errors_driver(
2454 stage: Stage,
2455 fail_old: bool,
2456 fail_new: bool,
2457 origins: Vec<Origin>,
2458 ) {
2459 let (client, event_rx) = make_mocked_client();
2460 let client = Arc::new(client);
2461 client.start_with_default_executor();
2462 client
2463 .data_store
2464 .write()
2465 .upsert(
2466 "stage-flag",
2467 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2468 )
2469 .expect("patch should apply");
2470
2471 let mut migrator = MigratorBuilder::new(client.clone())
2472 .track_latency(true)
2473 .read(
2474 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2475 |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2476 None,
2477 )
2478 .write(
2479 move |_| {
2480 async move {
2481 if fail_old {
2482 Err("fail".into())
2483 } else {
2484 Ok(serde_json::Value::Null)
2485 }
2486 }
2487 .boxed()
2488 },
2489 move |_| {
2490 async move {
2491 if fail_new {
2492 Err("fail".into())
2493 } else {
2494 Ok(serde_json::Value::Null)
2495 }
2496 }
2497 .boxed()
2498 },
2499 )
2500 .build()
2501 .expect("migrator should build");
2502
2503 let context = ContextBuilder::new("bob")
2504 .build()
2505 .expect("Failed to create context");
2506
2507 migrator
2508 .write(
2509 &context,
2510 "stage-flag".into(),
2511 Stage::Off,
2512 serde_json::Value::Null,
2513 )
2514 .await;
2515
2516 client.flush();
2517 client.close();
2518
2519 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2520 assert_eq!(events.len(), 3);
2521 match &events[1] {
2522 OutputEvent::MigrationOp(event) => {
2523 assert!(event.errors.len() == origins.len());
2524 assert!(event.errors.iter().all(|i| origins.contains(i)));
2525 }
2526 _ => panic!("Expected migration event"),
2527 }
2528 }
2529
2530 #[tokio::test]
2531 async fn migration_tracks_consistency() {
2532 migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2533 migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2534 migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2535 migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2536 }
2537
2538 async fn migration_tracks_consistency_driver(
2539 stage: Stage,
2540 old_return: &'static str,
2541 new_return: &'static str,
2542 expected_consistency: bool,
2543 ) {
2544 let (client, event_rx) = make_mocked_client();
2545 let client = Arc::new(client);
2546 client.start_with_default_executor();
2547 client
2548 .data_store
2549 .write()
2550 .upsert(
2551 "stage-flag",
2552 PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2553 )
2554 .expect("patch should apply");
2555
2556 let mut migrator = MigratorBuilder::new(client.clone())
2557 .track_latency(true)
2558 .read(
2559 |_| {
2560 async move {
2561 async_std::task::sleep(Duration::from_millis(100)).await;
2562 Ok(serde_json::Value::String(old_return.to_string()))
2563 }
2564 .boxed()
2565 },
2566 |_| {
2567 async move {
2568 async_std::task::sleep(Duration::from_millis(100)).await;
2569 Ok(serde_json::Value::String(new_return.to_string()))
2570 }
2571 .boxed()
2572 },
2573 Some(|lhs, rhs| lhs == rhs),
2574 )
2575 .write(
2576 |_| {
2577 async move {
2578 async_std::task::sleep(Duration::from_millis(100)).await;
2579 Ok(serde_json::Value::Null)
2580 }
2581 .boxed()
2582 },
2583 |_| {
2584 async move {
2585 async_std::task::sleep(Duration::from_millis(100)).await;
2586 Ok(serde_json::Value::Null)
2587 }
2588 .boxed()
2589 },
2590 )
2591 .build()
2592 .expect("migrator should build");
2593
2594 let context = ContextBuilder::new("bob")
2595 .build()
2596 .expect("Failed to create context");
2597
2598 migrator
2599 .read(
2600 &context,
2601 "stage-flag".into(),
2602 Stage::Off,
2603 serde_json::Value::Null,
2604 )
2605 .await;
2606
2607 client.flush();
2608 client.close();
2609
2610 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2611 assert_eq!(events.len(), 3);
2612 match &events[1] {
2613 OutputEvent::MigrationOp(event) => {
2614 assert!(event.consistency_check == Some(expected_consistency))
2615 }
2616 _ => panic!("Expected migration event"),
2617 }
2618 }
2619
2620 fn make_mocked_client_with_delay(
2621 delay: u64,
2622 offline: bool,
2623 daemon_mode: bool,
2624 ) -> (Client, Receiver<OutputEvent>) {
2625 let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2626 let (event_sender, event_rx) = create_event_sender();
2627
2628 let config = ConfigBuilder::new("sdk-key")
2629 .offline(offline)
2630 .daemon_mode(daemon_mode)
2631 .data_source(MockDataSourceBuilder::new().data_source(updates))
2632 .event_processor(
2633 EventProcessorBuilder::<HttpConnector>::new().event_sender(Arc::new(event_sender)),
2634 )
2635 .build()
2636 .expect("config should build");
2637
2638 let client = Client::build(config).expect("Should be built.");
2639
2640 (client, event_rx)
2641 }
2642
2643 fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2644 make_mocked_client_with_delay(0, true, false)
2645 }
2646
2647 fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2648 make_mocked_client_with_delay(0, false, false)
2649 }
2650}