launchdarkly_server_sdk/
client.rs

1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::{DataSource, EventReceived};
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28    disabled: bool,
29    event_factory: EventFactory,
30    prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34    event_factory: EventFactory,
35    event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39    fn record(&self, event: PrerequisiteEvent) {
40        let evt = self.event_factory.new_eval_event(
41            &event.prerequisite_flag.key,
42            event.context.clone(),
43            &event.prerequisite_flag,
44            event.prerequisite_result,
45            FlagValue::Json(serde_json::Value::Null),
46            Some(event.target_flag_key),
47        );
48
49        self.event_processor.send(evt);
50    }
51}
52
53/// Error type used to represent failures when building a [Client] instance.
54#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57    /// Error used when a configuration setting is invalid. This typically indicates an invalid URL.
58    #[error("invalid client config: {0}")]
59    InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63    fn from(error: DataSourceError) -> Self {
64        Self::InvalidConfig(error.to_string())
65    }
66}
67
68impl From<DataStoreError> for BuildError {
69    fn from(error: DataStoreError) -> Self {
70        Self::InvalidConfig(error.to_string())
71    }
72}
73
74impl From<EventProcessorError> for BuildError {
75    fn from(error: EventProcessorError) -> Self {
76        Self::InvalidConfig(error.to_string())
77    }
78}
79
80impl From<ConfigBuildError> for BuildError {
81    fn from(error: ConfigBuildError) -> Self {
82        Self::InvalidConfig(error.to_string())
83    }
84}
85
86/// Error type used to represent failures when starting the [Client].
87#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90    /// Error used when spawning a background there fails.
91    #[error("couldn't spawn background thread for client: {0}")]
92    SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97    Initializing = 0,
98    Initialized = 1,
99    InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103    fn eq(&self, other: &usize) -> bool {
104        *self as usize == *other
105    }
106}
107
108impl From<usize> for ClientInitState {
109    fn from(val: usize) -> Self {
110        match val {
111            0 => ClientInitState::Initializing,
112            1 => ClientInitState::Initialized,
113            2 => ClientInitState::InitializationFailed,
114            _ => unreachable!(),
115        }
116    }
117}
118
119/// A client for the LaunchDarkly API.
120///
121/// In order to create a client instance, first create a config using [crate::ConfigBuilder].
122///
123/// # Examples
124///
125/// Creating a client, with default configuration.
126/// ```
127/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, BuildError};
128/// # fn main() -> Result<(), BuildError> {
129///     let ld_client = Client::build(ConfigBuilder::new("sdk-key").build()?)?;
130/// #   Ok(())
131/// # }
132/// ```
133///
134/// Creating an instance which connects to a relay proxy.
135/// ```
136/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, ServiceEndpointsBuilder, BuildError};
137/// # fn main() -> Result<(), BuildError> {
138///     let ld_client = Client::build(ConfigBuilder::new("sdk-key")
139///         .service_endpoints(ServiceEndpointsBuilder::new()
140///             .relay_proxy("http://my-relay-hostname:8080")
141///         ).build()?
142///     )?;
143/// #   Ok(())
144/// # }
145/// ```
146///
147/// Each builder type includes usage examples for the builder.
148pub struct Client {
149    event_processor: Arc<dyn EventProcessor>,
150    data_source: Arc<dyn DataSource>,
151    data_store: Arc<RwLock<dyn DataStore>>,
152    events_default: EventsScope,
153    events_with_reasons: EventsScope,
154    init_notify: Arc<Semaphore>,
155    init_state: Arc<AtomicUsize>,
156    started: AtomicBool,
157    offline: bool,
158    daemon_mode: bool,
159    sdk_key: String,
160    shutdown_broadcast: broadcast::Sender<()>,
161    runtime: RwLock<Option<Runtime>>,
162}
163
164impl Client {
165    /// Create a new instance of a [Client] based on the provided [Config] parameter.
166    pub fn build(config: Config) -> Result<Self, BuildError> {
167        if config.offline() {
168            info!("Started LaunchDarkly Client in offline mode");
169        } else if config.daemon_mode() {
170            info!("Started LaunchDarkly Client in daemon mode");
171        }
172
173        let tags = config.application_tag();
174
175        let endpoints = config.service_endpoints_builder().build()?;
176        let event_processor =
177            config
178                .event_processor_builder()
179                .build(&endpoints, config.sdk_key(), tags.clone())?;
180        let data_source =
181            config
182                .data_source_builder()
183                .build(&endpoints, config.sdk_key(), tags.clone())?;
184        let data_store = config.data_store_builder().build()?;
185
186        let events_default = EventsScope {
187            disabled: config.offline(),
188            event_factory: EventFactory::new(false),
189            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
190                event_factory: EventFactory::new(false),
191                event_processor: event_processor.clone(),
192            }),
193        };
194
195        let events_with_reasons = EventsScope {
196            disabled: config.offline(),
197            event_factory: EventFactory::new(true),
198            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
199                event_factory: EventFactory::new(true),
200                event_processor: event_processor.clone(),
201            }),
202        };
203
204        let (shutdown_tx, _) = broadcast::channel(1);
205
206        Ok(Client {
207            event_processor,
208            data_source,
209            data_store,
210            events_default,
211            events_with_reasons,
212            init_notify: Arc::new(Semaphore::new(0)),
213            init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
214            started: AtomicBool::new(false),
215            offline: config.offline(),
216            daemon_mode: config.daemon_mode(),
217            sdk_key: config.sdk_key().into(),
218            shutdown_broadcast: shutdown_tx,
219            runtime: RwLock::new(None),
220        })
221    }
222
223    /// Starts a client in the current thread, which must have a default tokio
224    /// runtime. This variant accepts a callback for tracking the time of the
225    /// last processed server event.
226    pub fn start_with_default_executor_and_callback(&self, event_received: EventReceived) {
227        if self.started.load(Ordering::SeqCst) {
228            return;
229        }
230        self.started.store(true, Ordering::SeqCst);
231        self.start_with_default_executor_internal(event_received);
232    }
233
234    /// Starts a client in the current thread, which must have a default tokio runtime.
235    pub fn start_with_default_executor(&self) {
236        if self.started.load(Ordering::SeqCst) {
237            return;
238        }
239        self.started.store(true, Ordering::SeqCst);
240        self.start_with_default_executor_internal(Arc::new(move |_ev| {}));
241    }
242
243    fn start_with_default_executor_internal(&self, event_received: EventReceived) {
244        // These clones are going to move into the closure, we
245        // do not want to move or reference `self`, because
246        // then lifetimes will get involved.
247        let notify = self.init_notify.clone();
248        let init_state = self.init_state.clone();
249
250        self.data_source.subscribe(
251            self.data_store.clone(),
252            Arc::new(move |success| {
253                init_state.store(
254                    (if success {
255                        ClientInitState::Initialized
256                    } else {
257                        ClientInitState::InitializationFailed
258                    }) as usize,
259                    Ordering::SeqCst,
260                );
261                notify.add_permits(1);
262            }),
263            event_received,
264            self.shutdown_broadcast.subscribe(),
265        );
266    }
267
268    /// Creates a new tokio runtime and then starts the client. Tasks from the client will
269    /// be executed on created runtime.
270    /// If your application already has a tokio runtime, then you can use
271    /// [crate::Client::start_with_default_executor] and the client will dispatch tasks to
272    /// your existing runtime.
273    pub fn start_with_runtime(&self) -> Result<bool, StartError> {
274        if self.started.load(Ordering::SeqCst) {
275            return Ok(true);
276        }
277        self.started.store(true, Ordering::SeqCst);
278
279        let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
280        let _guard = runtime.enter();
281        self.runtime.write().replace(runtime);
282
283        self.start_with_default_executor_internal(Arc::new(move |_ev| {}));
284
285        Ok(true)
286    }
287
288    /// This is an async method that will resolve once initialization is complete.
289    /// Initialization being complete does not mean that initialization was a success.
290    /// The return value from the method indicates if the client successfully initialized.
291    #[deprecated(
292        note = "blocking without a timeout is discouraged, use wait_for_initialization instead"
293    )]
294    pub async fn initialized_async(&self) -> bool {
295        self.initialized_async_internal().await
296    }
297
298    /// This is an async method that will resolve once initialization is complete or the specified
299    /// timeout has occurred.
300    ///
301    /// If the timeout is triggered, this method will return `None`. Otherwise, the method will
302    /// return a boolean indicating whether or not the SDK has successfully initialized.
303    pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
304        if timeout > Duration::from_secs(60) {
305            warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
306        }
307
308        let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
309        match initialized {
310            Ok(result) => Some(result),
311            Err(_) => None,
312        }
313    }
314
315    async fn initialized_async_internal(&self) -> bool {
316        if self.offline || self.daemon_mode {
317            return true;
318        }
319
320        // If the client is not initialized, then we need to wait for it to be initialized.
321        // Because we are using atomic types, and not a lock, then there is still the possibility
322        // that the value will change between the read and when we wait. We use a semaphore to wait,
323        // and we do not forget the permit, therefore if the permit has been added, then we will get
324        // it very quickly and reduce blocking.
325        if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
326            let _permit = self.init_notify.acquire().await;
327        }
328        ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
329    }
330
331    /// This function synchronously returns if the SDK is initialized.
332    /// In the case of unrecoverable errors in establishing a connection it is possible for the
333    /// SDK to never become initialized.
334    pub fn initialized(&self) -> bool {
335        self.offline
336            || self.daemon_mode
337            || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
338    }
339
340    /// Close shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client
341    /// should no longer be used. The method will block until all pending analytics events (if any)
342    /// been sent.
343    pub fn close(&self) {
344        self.event_processor.close();
345
346        // If the system is in offline mode or daemon mode, no receiver will be listening to this
347        // broadcast channel, so sending on it would always result in an error.
348        if !self.offline && !self.daemon_mode {
349            if let Err(e) = self.shutdown_broadcast.send(()) {
350                error!("Failed to shutdown client appropriately: {}", e);
351            }
352        }
353
354        // Potentially take the runtime we created when starting the client and do nothing with it
355        // so it drops, closing out all spawned tasks.
356        self.runtime.write().take();
357    }
358
359    /// Flush tells the client that all pending analytics events (if any) should be delivered as
360    /// soon as possible. Flushing is asynchronous, so this method will return before it is
361    /// complete. However, if you call [Client::close], events are guaranteed to be sent before
362    /// that method returns.
363    ///
364    /// For more information, see the Reference Guide:
365    /// <https://docs.launchdarkly.com/sdk/features/flush#rust>.
366    pub fn flush(&self) {
367        self.event_processor.flush();
368    }
369
370    /// Identify reports details about a context.
371    ///
372    /// For more information, see the Reference Guide:
373    /// <https://docs.launchdarkly.com/sdk/features/identify#rust>
374    pub fn identify(&self, context: Context) {
375        if self.events_default.disabled {
376            return;
377        }
378
379        self.send_internal(self.events_default.event_factory.new_identify(context));
380    }
381
382    /// Returns the value of a boolean feature flag for a given context.
383    ///
384    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
385    /// off and has no off variation.
386    ///
387    /// For more information, see the Reference Guide:
388    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
389    pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
390        let val = self.variation(context, flag_key, default);
391        if let Some(b) = val.as_bool() {
392            b
393        } else {
394            warn!(
395                "bool_variation called for a non-bool flag {:?} (got {:?})",
396                flag_key, val
397            );
398            default
399        }
400    }
401
402    /// Returns the value of a string feature flag for a given context.
403    ///
404    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
405    /// off and has no off variation.
406    ///
407    /// For more information, see the Reference Guide:
408    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
409    pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
410        let val = self.variation(context, flag_key, default.clone());
411        if let Some(s) = val.as_string() {
412            s
413        } else {
414            warn!(
415                "str_variation called for a non-string flag {:?} (got {:?})",
416                flag_key, val
417            );
418            default
419        }
420    }
421
422    /// Returns the value of a float feature flag for a given context.
423    ///
424    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
425    /// off and has no off variation.
426    ///
427    /// For more information, see the Reference Guide:
428    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
429    pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
430        let val = self.variation(context, flag_key, default);
431        if let Some(f) = val.as_float() {
432            f
433        } else {
434            warn!(
435                "float_variation called for a non-float flag {:?} (got {:?})",
436                flag_key, val
437            );
438            default
439        }
440    }
441
442    /// Returns the value of a integer feature flag for a given context.
443    ///
444    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
445    /// off and has no off variation.
446    ///
447    /// For more information, see the Reference Guide:
448    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
449    pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
450        let val = self.variation(context, flag_key, default);
451        if let Some(f) = val.as_int() {
452            f
453        } else {
454            warn!(
455                "int_variation called for a non-int flag {:?} (got {:?})",
456                flag_key, val
457            );
458            default
459        }
460    }
461
462    /// Returns the value of a feature flag for the given context, allowing the value to be
463    /// of any JSON type.
464    ///
465    /// The value is returned as an [serde_json::Value].
466    ///
467    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned off.
468    ///
469    /// For more information, see the Reference Guide:
470    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
471    pub fn json_variation(
472        &self,
473        context: &Context,
474        flag_key: &str,
475        default: serde_json::Value,
476    ) -> serde_json::Value {
477        self.variation(context, flag_key, default.clone())
478            .as_json()
479            .unwrap_or(default)
480    }
481
482    /// This method is the same as [Client::bool_variation], but also returns further information
483    /// about how the value was calculated. The "reason" data will also be included in analytics
484    /// events.
485    ///
486    /// For more information, see the Reference Guide:
487    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
488    pub fn bool_variation_detail(
489        &self,
490        context: &Context,
491        flag_key: &str,
492        default: bool,
493    ) -> Detail<bool> {
494        self.variation_detail(context, flag_key, default).try_map(
495            |val| val.as_bool(),
496            default,
497            eval::Error::WrongType,
498        )
499    }
500
501    /// This method is the same as [Client::str_variation], but also returns further information
502    /// about how the value was calculated. The "reason" data will also be included in analytics
503    /// events.
504    ///
505    /// For more information, see the Reference Guide:
506    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
507    pub fn str_variation_detail(
508        &self,
509        context: &Context,
510        flag_key: &str,
511        default: String,
512    ) -> Detail<String> {
513        self.variation_detail(context, flag_key, default.clone())
514            .try_map(|val| val.as_string(), default, eval::Error::WrongType)
515    }
516
517    /// This method is the same as [Client::float_variation], but also returns further information
518    /// about how the value was calculated. The "reason" data will also be included in analytics
519    /// events.
520    ///
521    /// For more information, see the Reference Guide:
522    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
523    pub fn float_variation_detail(
524        &self,
525        context: &Context,
526        flag_key: &str,
527        default: f64,
528    ) -> Detail<f64> {
529        self.variation_detail(context, flag_key, default).try_map(
530            |val| val.as_float(),
531            default,
532            eval::Error::WrongType,
533        )
534    }
535
536    /// This method is the same as [Client::int_variation], but also returns further information
537    /// about how the value was calculated. The "reason" data will also be included in analytics
538    /// events.
539    ///
540    /// For more information, see the Reference Guide:
541    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
542    pub fn int_variation_detail(
543        &self,
544        context: &Context,
545        flag_key: &str,
546        default: i64,
547    ) -> Detail<i64> {
548        self.variation_detail(context, flag_key, default).try_map(
549            |val| val.as_int(),
550            default,
551            eval::Error::WrongType,
552        )
553    }
554
555    /// This method is the same as [Client::json_variation], but also returns further information
556    /// about how the value was calculated. The "reason" data will also be included in analytics
557    /// events.
558    ///
559    /// For more information, see the Reference Guide:
560    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
561    pub fn json_variation_detail(
562        &self,
563        context: &Context,
564        flag_key: &str,
565        default: serde_json::Value,
566    ) -> Detail<serde_json::Value> {
567        self.variation_detail(context, flag_key, default.clone())
568            .try_map(|val| val.as_json(), default, eval::Error::WrongType)
569    }
570
571    /// Generates the secure mode hash value for a context.
572    ///
573    /// For more information, see the Reference Guide:
574    /// <https://docs.launchdarkly.com/sdk/features/secure-mode#rust>.
575    pub fn secure_mode_hash(&self, context: &Context) -> String {
576        let key = ring::hmac::Key::new(ring::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
577        let tag = ring::hmac::sign(&key, context.canonical_key().as_bytes());
578
579        data_encoding::HEXLOWER.encode(tag.as_ref())
580    }
581
582    /// Returns an object that encapsulates the state of all feature flags for a given context. This
583    /// includes the flag values, and also metadata that can be used on the front end.
584    ///
585    /// The most common use case for this method is to bootstrap a set of client-side feature flags
586    /// from a back-end service.
587    ///
588    /// You may pass any configuration of [FlagDetailConfig] to control what data is included.
589    ///
590    /// For more information, see the Reference Guide:
591    /// <https://docs.launchdarkly.com/sdk/features/all-flags#rust>
592    pub fn all_flags_detail(
593        &self,
594        context: &Context,
595        flag_state_config: FlagDetailConfig,
596    ) -> FlagDetail {
597        if self.offline {
598            warn!(
599                "all_flags_detail() called, but client is in offline mode. Returning empty state"
600            );
601            return FlagDetail::new(false);
602        }
603
604        if !self.initialized() {
605            warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
606            return FlagDetail::new(false);
607        }
608
609        let data_store = self.data_store.read();
610
611        let mut flag_detail = FlagDetail::new(true);
612        flag_detail.populate(&*data_store, context, flag_state_config);
613
614        flag_detail
615    }
616
617    /// This method is the same as [Client::variation], but also returns further information about
618    /// how the value was calculated. The "reason" data will also be included in analytics events.
619    ///
620    /// For more information, see the Reference Guide:
621    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
622    pub fn variation_detail<T: Into<FlagValue> + Clone>(
623        &self,
624        context: &Context,
625        flag_key: &str,
626        default: T,
627    ) -> Detail<FlagValue> {
628        let (detail, _) =
629            self.variation_internal(context, flag_key, default, &self.events_with_reasons);
630        detail
631    }
632
633    /// This is a generic function which returns the value of a feature flag for a given context.
634    ///
635    /// This method is an alternatively to the type specified methods (e.g.
636    /// [Client::bool_variation], [Client::int_variation], etc.).
637    ///
638    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
639    /// off and has no off variation.
640    ///
641    /// For more information, see the Reference Guide:
642    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
643    pub fn variation<T: Into<FlagValue> + Clone>(
644        &self,
645        context: &Context,
646        flag_key: &str,
647        default: T,
648    ) -> FlagValue {
649        let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
650        detail.value.unwrap()
651    }
652
653    /// This method returns the migration stage of the migration feature flag for the given
654    /// evaluation context.
655    ///
656    /// This method returns the default stage if there is an error or the flag does not exist.
657    pub fn migration_variation(
658        &self,
659        context: &Context,
660        flag_key: &str,
661        default_stage: Stage,
662    ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
663        let (detail, flag) =
664            self.variation_internal(context, flag_key, default_stage, &self.events_default);
665
666        let migration_detail =
667            detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
668        let tracker = MigrationOpTracker::new(
669            flag_key.into(),
670            flag,
671            context.clone(),
672            migration_detail.clone(),
673            default_stage,
674        );
675
676        (
677            migration_detail.value.unwrap_or(default_stage),
678            Arc::new(Mutex::new(tracker)),
679        )
680    }
681
682    /// Reports that a context has performed an event.
683    ///
684    /// The `key` parameter is defined by the application and will be shown in analytics reports;
685    /// it normally corresponds to the event name of a metric that you have created through the
686    /// LaunchDarkly dashboard. If you want to associate additional data with this event, use
687    /// [Client::track_data] or [Client::track_metric].
688    ///
689    /// For more information, see the Reference Guide:
690    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
691    pub fn track_event(&self, context: Context, key: impl Into<String>) {
692        let _ = self.track(context, key, None, serde_json::Value::Null);
693    }
694
695    /// Reports that a context has performed an event, and associates it with custom data.
696    ///
697    /// The `key` parameter is defined by the application and will be shown in analytics reports;
698    /// it normally corresponds to the event name of a metric that you have created through the
699    /// LaunchDarkly dashboard.
700    ///
701    /// `data` parameter is any type that implements [Serialize]. If no such value is needed, use
702    /// [serde_json::Value::Null] (or call [Client::track_event] instead). To send a numeric value
703    /// for experimentation, use [Client::track_metric].
704    ///
705    /// For more information, see the Reference Guide:
706    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
707    pub fn track_data(
708        &self,
709        context: Context,
710        key: impl Into<String>,
711        data: impl Serialize,
712    ) -> serde_json::Result<()> {
713        self.track(context, key, None, data)
714    }
715
716    /// Reports that a context has performed an event, and associates it with a numeric value. This
717    /// value is used by the LaunchDarkly experimentation feature in numeric custom metrics, and
718    /// will also be returned as part of the custom event for Data Export.
719    ///
720    /// The `key` parameter is defined by the application and will be shown in analytics reports;
721    /// it normally corresponds to the event name of a metric that you have created through the
722    /// LaunchDarkly dashboard.
723    ///
724    /// For more information, see the Reference Guide:
725    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
726    pub fn track_metric(
727        &self,
728        context: Context,
729        key: impl Into<String>,
730        value: f64,
731        data: impl Serialize,
732    ) {
733        let _ = self.track(context, key, Some(value), data);
734    }
735
736    fn track(
737        &self,
738        context: Context,
739        key: impl Into<String>,
740        metric_value: Option<f64>,
741        data: impl Serialize,
742    ) -> serde_json::Result<()> {
743        if !self.events_default.disabled {
744            let event =
745                self.events_default
746                    .event_factory
747                    .new_custom(context, key, metric_value, data)?;
748
749            self.send_internal(event);
750        }
751
752        Ok(())
753    }
754
755    /// Tracks the results of a migrations operation. This event includes measurements which can be
756    /// used to enhance the observability of a migration within the LaunchDarkly UI.
757    ///
758    /// This event should be generated through [crate::MigrationOpTracker]. If you are using the
759    /// [crate::Migrator] to handle migrations, this event will be created and emitted
760    /// automatically.
761    pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
762        if self.events_default.disabled {
763            return;
764        }
765
766        match tracker.lock() {
767            Ok(tracker) => {
768                let event = tracker.build();
769                match event {
770                    Ok(event) => {
771                        self.send_internal(
772                            self.events_default.event_factory.new_migration_op(event),
773                        );
774                    }
775                    Err(e) => error!(
776                        "Failed to build migration event, no event will be sent: {}",
777                        e
778                    ),
779                }
780            }
781            Err(e) => error!(
782                "Failed to lock migration tracker, no event will be sent: {}",
783                e
784            ),
785        }
786    }
787
788    fn variation_internal<T: Into<FlagValue> + Clone>(
789        &self,
790        context: &Context,
791        flag_key: &str,
792        default: T,
793        events_scope: &EventsScope,
794    ) -> (Detail<FlagValue>, Option<eval::Flag>) {
795        if self.offline {
796            return (
797                Detail::err_default(eval::Error::ClientNotReady, default.into()),
798                None,
799            );
800        }
801
802        let (flag, result) = match self.initialized() {
803            false => (
804                None,
805                Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
806            ),
807            true => {
808                let data_store = self.data_store.read();
809                match data_store.flag(flag_key) {
810                    Some(flag) => {
811                        let result = eval::evaluate(
812                            data_store.to_store(),
813                            &flag,
814                            context,
815                            Some(&*events_scope.prerequisite_event_recorder),
816                        )
817                        .map(|v| v.clone())
818                        .or(default.clone().into());
819
820                        (Some(flag), result)
821                    }
822                    None => (
823                        None,
824                        Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
825                    ),
826                }
827            }
828        };
829
830        if !events_scope.disabled {
831            let event = match &flag {
832                Some(f) => events_scope.event_factory.new_eval_event(
833                    flag_key,
834                    context.clone(),
835                    f,
836                    result.clone(),
837                    default.into(),
838                    None,
839                ),
840                None => events_scope.event_factory.new_unknown_flag_event(
841                    flag_key,
842                    context.clone(),
843                    result.clone(),
844                    default.into(),
845                ),
846            };
847            self.send_internal(event);
848        }
849
850        (result, flag)
851    }
852
853    fn send_internal(&self, event: InputEvent) {
854        self.event_processor.send(event);
855    }
856}
857
858#[cfg(test)]
859mod tests {
860    use assert_json_diff::assert_json_eq;
861    use crossbeam_channel::Receiver;
862    use eval::{ContextBuilder, MultiContextBuilder};
863    use futures::FutureExt;
864    use hyper::client::HttpConnector;
865    use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
866    use maplit::hashmap;
867    use std::collections::HashMap;
868    use tokio::time::Instant;
869
870    use crate::data_source::MockDataSource;
871    use crate::data_source_builders::MockDataSourceBuilder;
872    use crate::events::create_event_sender;
873    use crate::events::event::{OutputEvent, VariationKey};
874    use crate::events::processor_builders::EventProcessorBuilder;
875    use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
876    use crate::stores::store_types::{PatchTarget, StorageItem};
877    use crate::test_common::{
878        self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
879        basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
880    };
881    use crate::{
882        AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
883        PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
884        SerializedItem,
885    };
886    use test_case::test_case;
887
888    use super::*;
889
890    fn is_send_and_sync<T: Send + Sync>() {}
891
892    #[test]
893    fn ensure_client_is_send_and_sync() {
894        is_send_and_sync::<Client>()
895    }
896
897    #[tokio::test]
898    async fn client_asynchronously_initializes() {
899        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
900        client.start_with_default_executor();
901
902        let now = Instant::now();
903        let initialized = client.initialized_async().await;
904        let elapsed_time = now.elapsed();
905        assert!(initialized);
906        // Give ourself a good margin for thread scheduling.
907        assert!(elapsed_time.as_millis() > 500)
908    }
909
910    #[tokio::test]
911    async fn client_asynchronously_initializes_within_timeout() {
912        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
913        client.start_with_default_executor();
914
915        let now = Instant::now();
916        let initialized = client
917            .wait_for_initialization(Duration::from_millis(1500))
918            .await;
919        let elapsed_time = now.elapsed();
920        // Give ourself a good margin for thread scheduling.
921        assert!(elapsed_time.as_millis() > 500);
922        assert_eq!(initialized, Some(true));
923    }
924
925    #[tokio::test]
926    async fn client_asynchronously_initializes_slower_than_timeout() {
927        let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
928        client.start_with_default_executor();
929
930        let now = Instant::now();
931        let initialized = client
932            .wait_for_initialization(Duration::from_millis(500))
933            .await;
934        let elapsed_time = now.elapsed();
935        // Give ourself a good margin for thread scheduling.
936        assert!(elapsed_time.as_millis() < 750);
937        assert!(initialized.is_none());
938    }
939
940    #[tokio::test]
941    async fn client_initializes_immediately_in_offline_mode() {
942        let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
943        client.start_with_default_executor();
944
945        assert!(client.initialized());
946
947        let now = Instant::now();
948        let initialized = client
949            .wait_for_initialization(Duration::from_millis(2000))
950            .await;
951        let elapsed_time = now.elapsed();
952        assert_eq!(initialized, Some(true));
953        assert!(elapsed_time.as_millis() < 500)
954    }
955
956    #[tokio::test]
957    async fn client_initializes_immediately_in_daemon_mode() {
958        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
959        client.start_with_default_executor();
960
961        assert!(client.initialized());
962
963        let now = Instant::now();
964        let initialized = client
965            .wait_for_initialization(Duration::from_millis(2000))
966            .await;
967        let elapsed_time = now.elapsed();
968        assert_eq!(initialized, Some(true));
969        assert!(elapsed_time.as_millis() < 500)
970    }
971
972    #[test_case(basic_flag("myFlag"), false.into(), true.into())]
973    #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
974    fn client_updates_changes_evaluation_results(
975        flag: eval::Flag,
976        default: FlagValue,
977        expected: FlagValue,
978    ) {
979        let context = ContextBuilder::new("foo")
980            .build()
981            .expect("Failed to create context");
982
983        let (client, _event_rx) = make_mocked_client();
984
985        let result = client.variation_detail(&context, "myFlag", default.clone());
986        assert_eq!(result.value.unwrap(), default);
987
988        client.start_with_default_executor();
989        client
990            .data_store
991            .write()
992            .upsert(
993                &flag.key,
994                PatchTarget::Flag(StorageItem::Item(flag.clone())),
995            )
996            .expect("patch should apply");
997
998        let result = client.variation_detail(&context, "myFlag", default);
999        assert_eq!(result.value.unwrap(), expected);
1000        assert!(matches!(
1001            result.reason,
1002            Reason::Fallthrough {
1003                in_experiment: false
1004            }
1005        ));
1006    }
1007
1008    #[test]
1009    fn all_flags_detail_is_invalid_when_offline() {
1010        let (client, _event_rx) = make_mocked_offline_client();
1011        client.start_with_default_executor();
1012
1013        let context = ContextBuilder::new("bob")
1014            .build()
1015            .expect("Failed to create context");
1016
1017        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1018        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1019    }
1020
1021    #[test]
1022    fn all_flags_detail_is_invalid_when_not_initialized() {
1023        let (client, _event_rx) = make_mocked_client();
1024
1025        let context = ContextBuilder::new("bob")
1026            .build()
1027            .expect("Failed to create context");
1028
1029        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1030        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1031    }
1032
1033    #[test]
1034    fn all_flags_detail_returns_flag_states() {
1035        let (client, _event_rx) = make_mocked_client();
1036        client.start_with_default_executor();
1037        client
1038            .data_store
1039            .write()
1040            .upsert(
1041                "myFlag1",
1042                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag1"))),
1043            )
1044            .expect("patch should apply");
1045        client
1046            .data_store
1047            .write()
1048            .upsert(
1049                "myFlag2",
1050                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag2"))),
1051            )
1052            .expect("patch should apply");
1053        let context = ContextBuilder::new("bob")
1054            .build()
1055            .expect("Failed to create context");
1056
1057        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1058
1059        client.close();
1060
1061        assert_json_eq!(
1062            all_flags,
1063            json!({
1064                "myFlag1": true,
1065                "myFlag2": true,
1066                "$flagsState": {
1067                    "myFlag1": {
1068                        "version": 42,
1069                        "variation": 1
1070                    },
1071                     "myFlag2": {
1072                        "version": 42,
1073                        "variation": 1
1074                    },
1075                },
1076                "$valid": true
1077            })
1078        );
1079    }
1080
1081    #[test]
1082    fn all_flags_detail_returns_prerequisite_relations() {
1083        let (client, _event_rx) = make_mocked_client();
1084        client.start_with_default_executor();
1085        client
1086            .data_store
1087            .write()
1088            .upsert(
1089                "prereq1",
1090                PatchTarget::Flag(StorageItem::Item(basic_flag("prereq1"))),
1091            )
1092            .expect("patch should apply");
1093        client
1094            .data_store
1095            .write()
1096            .upsert(
1097                "prereq2",
1098                PatchTarget::Flag(StorageItem::Item(basic_flag("prereq2"))),
1099            )
1100            .expect("patch should apply");
1101
1102        client
1103            .data_store
1104            .write()
1105            .upsert(
1106                "toplevel",
1107                PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1108                    "toplevel",
1109                    &["prereq1", "prereq2"],
1110                    false,
1111                ))),
1112            )
1113            .expect("patch should apply");
1114
1115        let context = ContextBuilder::new("bob")
1116            .build()
1117            .expect("Failed to create context");
1118
1119        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1120
1121        client.close();
1122
1123        assert_json_eq!(
1124            all_flags,
1125            json!({
1126                "prereq1": true,
1127                "prereq2": true,
1128                "toplevel": true,
1129                "$flagsState": {
1130                    "toplevel": {
1131                        "version": 42,
1132                        "variation": 1,
1133                        "prerequisites": ["prereq1", "prereq2"]
1134                    },
1135                    "prereq1": {
1136                        "version": 42,
1137                        "variation": 1
1138                    },
1139                     "prereq2": {
1140                        "version": 42,
1141                        "variation": 1
1142                    },
1143                },
1144                "$valid": true
1145            })
1146        );
1147    }
1148
1149    #[test]
1150    fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1151        let (client, _event_rx) = make_mocked_client();
1152        client.start_with_default_executor();
1153        client
1154            .data_store
1155            .write()
1156            .upsert(
1157                "prereq1",
1158                PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1159                    "prereq1", false,
1160                ))),
1161            )
1162            .expect("patch should apply");
1163        client
1164            .data_store
1165            .write()
1166            .upsert(
1167                "prereq2",
1168                PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1169                    "prereq2", false,
1170                ))),
1171            )
1172            .expect("patch should apply");
1173
1174        client
1175            .data_store
1176            .write()
1177            .upsert(
1178                "toplevel",
1179                PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1180                    "toplevel",
1181                    &["prereq1", "prereq2"],
1182                    true,
1183                ))),
1184            )
1185            .expect("patch should apply");
1186
1187        let context = ContextBuilder::new("bob")
1188            .build()
1189            .expect("Failed to create context");
1190
1191        let mut config = FlagDetailConfig::new();
1192        config.client_side_only();
1193
1194        let all_flags = client.all_flags_detail(&context, config);
1195
1196        client.close();
1197
1198        assert_json_eq!(
1199            all_flags,
1200            json!({
1201                "toplevel": true,
1202                "$flagsState": {
1203                    "toplevel": {
1204                        "version": 42,
1205                        "variation": 1,
1206                        "prerequisites": ["prereq1", "prereq2"]
1207                    },
1208                },
1209                "$valid": true
1210            })
1211        );
1212    }
1213
1214    #[test]
1215    fn variation_tracks_events_correctly() {
1216        let (client, event_rx) = make_mocked_client();
1217        client.start_with_default_executor();
1218        client
1219            .data_store
1220            .write()
1221            .upsert(
1222                "myFlag",
1223                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1224            )
1225            .expect("patch should apply");
1226        let context = ContextBuilder::new("bob")
1227            .build()
1228            .expect("Failed to create context");
1229
1230        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1231
1232        assert!(flag_value.as_bool().unwrap());
1233        client.flush();
1234        client.close();
1235
1236        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1237        assert_eq!(events.len(), 2);
1238        assert_eq!(events[0].kind(), "index");
1239        assert_eq!(events[1].kind(), "summary");
1240
1241        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1242            let variation_key = VariationKey {
1243                version: Some(42),
1244                variation: Some(1),
1245            };
1246            let feature = event_summary.features.get("myFlag");
1247            assert!(feature.is_some());
1248
1249            let feature = feature.unwrap();
1250            assert!(feature.counters.contains_key(&variation_key));
1251        } else {
1252            panic!("Event should be a summary type");
1253        }
1254    }
1255
1256    #[test]
1257    fn variation_handles_offline_mode() {
1258        let (client, event_rx) = make_mocked_offline_client();
1259        client.start_with_default_executor();
1260
1261        let context = ContextBuilder::new("bob")
1262            .build()
1263            .expect("Failed to create context");
1264        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1265
1266        assert!(!flag_value.as_bool().unwrap());
1267        client.flush();
1268        client.close();
1269
1270        assert_eq!(event_rx.iter().count(), 0);
1271    }
1272
1273    #[test]
1274    fn variation_handles_unknown_flags() {
1275        let (client, event_rx) = make_mocked_client();
1276        client.start_with_default_executor();
1277        let context = ContextBuilder::new("bob")
1278            .build()
1279            .expect("Failed to create context");
1280
1281        let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1282
1283        assert!(!flag_value.as_bool().unwrap());
1284        client.flush();
1285        client.close();
1286
1287        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1288        assert_eq!(events.len(), 2);
1289        assert_eq!(events[0].kind(), "index");
1290        assert_eq!(events[1].kind(), "summary");
1291
1292        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1293            let variation_key = VariationKey {
1294                version: None,
1295                variation: None,
1296            };
1297
1298            let feature = event_summary.features.get("non-existent-flag");
1299            assert!(feature.is_some());
1300
1301            let feature = feature.unwrap();
1302            assert!(feature.counters.contains_key(&variation_key));
1303        } else {
1304            panic!("Event should be a summary type");
1305        }
1306    }
1307
1308    #[test]
1309    fn variation_detail_handles_debug_events_correctly() {
1310        let (client, event_rx) = make_mocked_client();
1311        client.start_with_default_executor();
1312
1313        let mut flag = basic_flag("myFlag");
1314        flag.debug_events_until_date = Some(64_060_606_800_000); // Jan. 1st, 4000
1315
1316        client
1317            .data_store
1318            .write()
1319            .upsert(
1320                &flag.key,
1321                PatchTarget::Flag(StorageItem::Item(flag.clone())),
1322            )
1323            .expect("patch should apply");
1324        let context = ContextBuilder::new("bob")
1325            .build()
1326            .expect("Failed to create context");
1327
1328        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1329
1330        assert!(detail.value.unwrap().as_bool().unwrap());
1331        assert!(matches!(
1332            detail.reason,
1333            Reason::Fallthrough {
1334                in_experiment: false
1335            }
1336        ));
1337        client.flush();
1338        client.close();
1339
1340        let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1341        assert_eq!(events.len(), 3);
1342        assert_eq!(events[0].kind(), "index");
1343        assert_eq!(events[1].kind(), "debug");
1344        assert_eq!(events[2].kind(), "summary");
1345
1346        if let OutputEvent::Summary(event_summary) = events[2].clone() {
1347            let variation_key = VariationKey {
1348                version: Some(42),
1349                variation: Some(1),
1350            };
1351
1352            let feature = event_summary.features.get("myFlag");
1353            assert!(feature.is_some());
1354
1355            let feature = feature.unwrap();
1356            assert!(feature.counters.contains_key(&variation_key));
1357        } else {
1358            panic!("Event should be a summary type");
1359        }
1360    }
1361
1362    #[test]
1363    fn variation_detail_tracks_events_correctly() {
1364        let (client, event_rx) = make_mocked_client();
1365        client.start_with_default_executor();
1366
1367        client
1368            .data_store
1369            .write()
1370            .upsert(
1371                "myFlag",
1372                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1373            )
1374            .expect("patch should apply");
1375        let context = ContextBuilder::new("bob")
1376            .build()
1377            .expect("Failed to create context");
1378
1379        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1380
1381        assert!(detail.value.unwrap().as_bool().unwrap());
1382        assert!(matches!(
1383            detail.reason,
1384            Reason::Fallthrough {
1385                in_experiment: false
1386            }
1387        ));
1388        client.flush();
1389        client.close();
1390
1391        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1392        assert_eq!(events.len(), 2);
1393        assert_eq!(events[0].kind(), "index");
1394        assert_eq!(events[1].kind(), "summary");
1395
1396        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1397            let variation_key = VariationKey {
1398                version: Some(42),
1399                variation: Some(1),
1400            };
1401
1402            let feature = event_summary.features.get("myFlag");
1403            assert!(feature.is_some());
1404
1405            let feature = feature.unwrap();
1406            assert!(feature.counters.contains_key(&variation_key));
1407        } else {
1408            panic!("Event should be a summary type");
1409        }
1410    }
1411
1412    #[test]
1413    fn variation_detail_handles_offline_mode() {
1414        let (client, event_rx) = make_mocked_offline_client();
1415        client.start_with_default_executor();
1416
1417        let context = ContextBuilder::new("bob")
1418            .build()
1419            .expect("Failed to create context");
1420
1421        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1422
1423        assert!(!detail.value.unwrap().as_bool().unwrap());
1424        assert!(matches!(
1425            detail.reason,
1426            Reason::Error {
1427                error: eval::Error::ClientNotReady
1428            }
1429        ));
1430        client.flush();
1431        client.close();
1432
1433        assert_eq!(event_rx.iter().count(), 0);
1434    }
1435
1436    struct InMemoryPersistentDataStoreFactory {
1437        data: AllData<Flag, Segment>,
1438        initialized: bool,
1439    }
1440
1441    impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1442        fn create_persistent_data_store(
1443            &self,
1444        ) -> Result<Box<(dyn PersistentDataStore + 'static)>, std::io::Error> {
1445            let serialized_data =
1446                AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1447            Ok(Box::new(InMemoryPersistentDataStore {
1448                data: serialized_data,
1449                initialized: self.initialized,
1450            }))
1451        }
1452    }
1453
1454    #[test]
1455    fn variation_detail_handles_daemon_mode() {
1456        testing_logger::setup();
1457        let factory = InMemoryPersistentDataStoreFactory {
1458            data: AllData {
1459                flags: hashmap!["flag".into() => basic_flag("flag")],
1460                segments: HashMap::new(),
1461            },
1462            initialized: true,
1463        };
1464        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1465
1466        let config = ConfigBuilder::new("sdk-key")
1467            .daemon_mode(true)
1468            .data_store(&builder)
1469            .event_processor(&NullEventProcessorBuilder::new())
1470            .build()
1471            .expect("config should build");
1472
1473        let client = Client::build(config).expect("Should be built.");
1474
1475        client.start_with_default_executor();
1476
1477        let context = ContextBuilder::new("bob")
1478            .build()
1479            .expect("Failed to create context");
1480
1481        let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1482
1483        assert!(detail.value.unwrap().as_bool().unwrap());
1484        assert!(matches!(
1485            detail.reason,
1486            Reason::Fallthrough {
1487                in_experiment: false
1488            }
1489        ));
1490        client.flush();
1491        client.close();
1492
1493        testing_logger::validate(|captured_logs| {
1494            assert_eq!(captured_logs.len(), 1);
1495            assert_eq!(
1496                captured_logs[0].body,
1497                "Started LaunchDarkly Client in daemon mode"
1498            );
1499        });
1500    }
1501
1502    #[test]
1503    fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1504        testing_logger::setup();
1505
1506        let factory = InMemoryPersistentDataStoreFactory {
1507            data: AllData {
1508                flags: HashMap::new(),
1509                segments: HashMap::new(),
1510            },
1511            initialized: false,
1512        };
1513        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1514
1515        let config = ConfigBuilder::new("sdk-key")
1516            .daemon_mode(true)
1517            .data_store(&builder)
1518            .event_processor(&NullEventProcessorBuilder::new())
1519            .build()
1520            .expect("config should build");
1521
1522        let client = Client::build(config).expect("Should be built.");
1523
1524        client.start_with_default_executor();
1525
1526        let context = ContextBuilder::new("bob")
1527            .build()
1528            .expect("Failed to create context");
1529
1530        client.variation_detail(&context, "flag", FlagValue::Bool(false));
1531
1532        testing_logger::validate(|captured_logs| {
1533            assert_eq!(captured_logs.len(), 1);
1534            assert_eq!(
1535                captured_logs[0].body,
1536                "Started LaunchDarkly Client in daemon mode"
1537            );
1538        });
1539    }
1540
1541    #[test]
1542    fn variation_handles_off_flag_without_variation() {
1543        let (client, event_rx) = make_mocked_client();
1544        client.start_with_default_executor();
1545
1546        client
1547            .data_store
1548            .write()
1549            .upsert(
1550                "myFlag",
1551                PatchTarget::Flag(StorageItem::Item(basic_off_flag("myFlag"))),
1552            )
1553            .expect("patch should apply");
1554        let context = ContextBuilder::new("bob")
1555            .build()
1556            .expect("Failed to create context");
1557
1558        let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1559
1560        assert!(!result.as_bool().unwrap());
1561        client.flush();
1562        client.close();
1563
1564        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1565        assert_eq!(events.len(), 2);
1566        assert_eq!(events[0].kind(), "index");
1567        assert_eq!(events[1].kind(), "summary");
1568
1569        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1570            let variation_key = VariationKey {
1571                version: Some(42),
1572                variation: None,
1573            };
1574            let feature = event_summary.features.get("myFlag");
1575            assert!(feature.is_some());
1576
1577            let feature = feature.unwrap();
1578            assert!(feature.counters.contains_key(&variation_key));
1579        } else {
1580            panic!("Event should be a summary type");
1581        }
1582    }
1583
1584    #[test]
1585    fn variation_detail_tracks_prereq_events_correctly() {
1586        let (client, event_rx) = make_mocked_client();
1587        client.start_with_default_executor();
1588
1589        let mut basic_preqreq_flag = basic_flag("prereqFlag");
1590        basic_preqreq_flag.track_events = true;
1591
1592        client
1593            .data_store
1594            .write()
1595            .upsert(
1596                "prereqFlag",
1597                PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1598            )
1599            .expect("patch should apply");
1600
1601        let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1602        basic_flag.track_events = true;
1603        client
1604            .data_store
1605            .write()
1606            .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1607            .expect("patch should apply");
1608        let context = ContextBuilder::new("bob")
1609            .build()
1610            .expect("Failed to create context");
1611
1612        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1613
1614        assert!(detail.value.unwrap().as_bool().unwrap());
1615        assert!(matches!(
1616            detail.reason,
1617            Reason::Fallthrough {
1618                in_experiment: false
1619            }
1620        ));
1621        client.flush();
1622        client.close();
1623
1624        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1625        assert_eq!(events.len(), 4);
1626        assert_eq!(events[0].kind(), "index");
1627        assert_eq!(events[1].kind(), "feature");
1628        assert_eq!(events[2].kind(), "feature");
1629        assert_eq!(events[3].kind(), "summary");
1630
1631        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1632            let variation_key = VariationKey {
1633                version: Some(42),
1634                variation: Some(1),
1635            };
1636            let feature = event_summary.features.get("myFlag");
1637            assert!(feature.is_some());
1638
1639            let feature = feature.unwrap();
1640            assert!(feature.counters.contains_key(&variation_key));
1641
1642            let variation_key = VariationKey {
1643                version: Some(42),
1644                variation: Some(1),
1645            };
1646            let feature = event_summary.features.get("prereqFlag");
1647            assert!(feature.is_some());
1648
1649            let feature = feature.unwrap();
1650            assert!(feature.counters.contains_key(&variation_key));
1651        }
1652    }
1653
1654    #[test]
1655    fn variation_handles_failed_prereqs_correctly() {
1656        let (client, event_rx) = make_mocked_client();
1657        client.start_with_default_executor();
1658
1659        let mut basic_preqreq_flag = basic_off_flag("prereqFlag");
1660        basic_preqreq_flag.track_events = true;
1661
1662        client
1663            .data_store
1664            .write()
1665            .upsert(
1666                "prereqFlag",
1667                PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1668            )
1669            .expect("patch should apply");
1670
1671        let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1672        basic_flag.track_events = true;
1673        client
1674            .data_store
1675            .write()
1676            .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1677            .expect("patch should apply");
1678        let context = ContextBuilder::new("bob")
1679            .build()
1680            .expect("Failed to create context");
1681
1682        let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1683
1684        assert!(!detail.as_bool().unwrap());
1685        client.flush();
1686        client.close();
1687
1688        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1689        assert_eq!(events.len(), 4);
1690        assert_eq!(events[0].kind(), "index");
1691        assert_eq!(events[1].kind(), "feature");
1692        assert_eq!(events[2].kind(), "feature");
1693        assert_eq!(events[3].kind(), "summary");
1694
1695        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1696            let variation_key = VariationKey {
1697                version: Some(42),
1698                variation: Some(0),
1699            };
1700            let feature = event_summary.features.get("myFlag");
1701            assert!(feature.is_some());
1702
1703            let feature = feature.unwrap();
1704            assert!(feature.counters.contains_key(&variation_key));
1705
1706            let variation_key = VariationKey {
1707                version: Some(42),
1708                variation: None,
1709            };
1710            let feature = event_summary.features.get("prereqFlag");
1711            assert!(feature.is_some());
1712
1713            let feature = feature.unwrap();
1714            assert!(feature.counters.contains_key(&variation_key));
1715        }
1716    }
1717
1718    #[test]
1719    fn variation_detail_handles_flag_not_found() {
1720        let (client, event_rx) = make_mocked_client();
1721        client.start_with_default_executor();
1722
1723        let context = ContextBuilder::new("bob")
1724            .build()
1725            .expect("Failed to create context");
1726        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1727
1728        assert!(!detail.value.unwrap().as_bool().unwrap());
1729        assert!(matches!(
1730            detail.reason,
1731            Reason::Error {
1732                error: eval::Error::FlagNotFound
1733            }
1734        ));
1735        client.flush();
1736        client.close();
1737
1738        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1739        assert_eq!(events.len(), 2);
1740        assert_eq!(events[0].kind(), "index");
1741        assert_eq!(events[1].kind(), "summary");
1742
1743        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1744            let variation_key = VariationKey {
1745                version: None,
1746                variation: None,
1747            };
1748            let feature = event_summary.features.get("non-existent-flag");
1749            assert!(feature.is_some());
1750
1751            let feature = feature.unwrap();
1752            assert!(feature.counters.contains_key(&variation_key));
1753        } else {
1754            panic!("Event should be a summary type");
1755        }
1756    }
1757
1758    #[tokio::test]
1759    async fn variation_detail_handles_client_not_ready() {
1760        let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1761        client.start_with_default_executor();
1762        let context = ContextBuilder::new("bob")
1763            .build()
1764            .expect("Failed to create context");
1765
1766        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1767
1768        assert!(!detail.value.unwrap().as_bool().unwrap());
1769        assert!(matches!(
1770            detail.reason,
1771            Reason::Error {
1772                error: eval::Error::ClientNotReady
1773            }
1774        ));
1775        client.flush();
1776        client.close();
1777
1778        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1779        assert_eq!(events.len(), 2);
1780        assert_eq!(events[0].kind(), "index");
1781        assert_eq!(events[1].kind(), "summary");
1782
1783        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1784            let variation_key = VariationKey {
1785                version: None,
1786                variation: None,
1787            };
1788            let feature = event_summary.features.get("non-existent-flag");
1789            assert!(feature.is_some());
1790
1791            let feature = feature.unwrap();
1792            assert!(feature.counters.contains_key(&variation_key));
1793        } else {
1794            panic!("Event should be a summary type");
1795        }
1796    }
1797
1798    #[test]
1799    fn identify_sends_identify_event() {
1800        let (client, event_rx) = make_mocked_client();
1801        client.start_with_default_executor();
1802
1803        let context = ContextBuilder::new("bob")
1804            .build()
1805            .expect("Failed to create context");
1806
1807        client.identify(context);
1808        client.flush();
1809        client.close();
1810
1811        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1812        assert_eq!(events.len(), 1);
1813        assert_eq!(events[0].kind(), "identify");
1814    }
1815
1816    #[test]
1817    fn identify_sends_sends_nothing_in_offline_mode() {
1818        let (client, event_rx) = make_mocked_offline_client();
1819        client.start_with_default_executor();
1820
1821        let context = ContextBuilder::new("bob")
1822            .build()
1823            .expect("Failed to create context");
1824
1825        client.identify(context);
1826        client.flush();
1827        client.close();
1828
1829        assert_eq!(event_rx.iter().count(), 0);
1830    }
1831
1832    #[test]
1833    fn secure_mode_hash() {
1834        let config = ConfigBuilder::new("secret")
1835            .offline(true)
1836            .build()
1837            .expect("config should build");
1838        let client = Client::build(config).expect("Should be built.");
1839        let context = ContextBuilder::new("Message")
1840            .build()
1841            .expect("Failed to create context");
1842
1843        assert_eq!(
1844            client.secure_mode_hash(&context),
1845            "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1846        );
1847    }
1848
1849    #[test]
1850    fn secure_mode_hash_with_multi_kind() {
1851        let config = ConfigBuilder::new("secret")
1852            .offline(true)
1853            .build()
1854            .expect("config should build");
1855        let client = Client::build(config).expect("Should be built.");
1856
1857        let org = ContextBuilder::new("org-key|1")
1858            .kind("org")
1859            .build()
1860            .expect("Failed to create context");
1861        let user = ContextBuilder::new("user-key:2")
1862            .build()
1863            .expect("Failed to create context");
1864
1865        let context = MultiContextBuilder::new()
1866            .add_context(org)
1867            .add_context(user)
1868            .build()
1869            .expect("failed to build multi-context");
1870
1871        assert_eq!(
1872            client.secure_mode_hash(&context),
1873            "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1874        );
1875    }
1876
1877    #[derive(Serialize)]
1878    struct MyCustomData {
1879        pub answer: u32,
1880    }
1881
1882    #[test]
1883    fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1884        let (client, event_rx) = make_mocked_client();
1885        client.start_with_default_executor();
1886
1887        let context = ContextBuilder::new("bob")
1888            .build()
1889            .expect("Failed to create context");
1890
1891        client.track_event(context.clone(), "event-with-null");
1892        client.track_data(context.clone(), "event-with-string", "string-data")?;
1893        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1894        client.track_data(
1895            context.clone(),
1896            "event-with-struct",
1897            MyCustomData { answer: 42 },
1898        )?;
1899        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1900
1901        client.flush();
1902        client.close();
1903
1904        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1905        assert_eq!(events.len(), 6);
1906
1907        let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1908        for event in events {
1909            if let Some(count) = events_by_type.get_mut(event.kind()) {
1910                *count += 1;
1911            } else {
1912                events_by_type.insert(event.kind(), 1);
1913            }
1914        }
1915        assert!(matches!(events_by_type.get("index"), Some(1)));
1916        assert!(matches!(events_by_type.get("custom"), Some(5)));
1917
1918        Ok(())
1919    }
1920
1921    #[test]
1922    fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1923        let (client, event_rx) = make_mocked_offline_client();
1924        client.start_with_default_executor();
1925
1926        let context = ContextBuilder::new("bob")
1927            .build()
1928            .expect("Failed to create context");
1929
1930        client.track_event(context.clone(), "event-with-null");
1931        client.track_data(context.clone(), "event-with-string", "string-data")?;
1932        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1933        client.track_data(
1934            context.clone(),
1935            "event-with-struct",
1936            MyCustomData { answer: 42 },
1937        )?;
1938        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1939
1940        client.flush();
1941        client.close();
1942
1943        assert_eq!(event_rx.iter().count(), 0);
1944
1945        Ok(())
1946    }
1947
1948    #[test]
1949    fn migration_handles_flag_not_found() {
1950        let (client, _event_rx) = make_mocked_client();
1951        client.start_with_default_executor();
1952
1953        let context = ContextBuilder::new("bob")
1954            .build()
1955            .expect("Failed to create context");
1956
1957        let (stage, _tracker) =
1958            client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1959
1960        assert_eq!(stage, Stage::Off);
1961    }
1962
1963    #[test]
1964    fn migration_uses_non_migration_flag() {
1965        let (client, _event_rx) = make_mocked_client();
1966        client.start_with_default_executor();
1967        client
1968            .data_store
1969            .write()
1970            .upsert(
1971                "boolean-flag",
1972                PatchTarget::Flag(StorageItem::Item(basic_flag("boolean-flag"))),
1973            )
1974            .expect("patch should apply");
1975
1976        let context = ContextBuilder::new("bob")
1977            .build()
1978            .expect("Failed to create context");
1979
1980        let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1981
1982        assert_eq!(stage, Stage::Off);
1983    }
1984
1985    #[test_case(Stage::Off)]
1986    #[test_case(Stage::DualWrite)]
1987    #[test_case(Stage::Shadow)]
1988    #[test_case(Stage::Live)]
1989    #[test_case(Stage::Rampdown)]
1990    #[test_case(Stage::Complete)]
1991    fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1992        let (client, _event_rx) = make_mocked_client();
1993        client.start_with_default_executor();
1994        client
1995            .data_store
1996            .write()
1997            .upsert(
1998                "stage-flag",
1999                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2000            )
2001            .expect("patch should apply");
2002
2003        let context = ContextBuilder::new("bob")
2004            .build()
2005            .expect("Failed to create context");
2006
2007        let (evaluated_stage, _tracker) =
2008            client.migration_variation(&context, "stage-flag", Stage::Off);
2009
2010        assert_eq!(evaluated_stage, stage);
2011    }
2012
2013    #[tokio::test]
2014    async fn migration_tracks_invoked_correctly() {
2015        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
2016            .await;
2017        migration_tracks_invoked_correctly_driver(
2018            Stage::DualWrite,
2019            Operation::Read,
2020            vec![Origin::Old],
2021        )
2022        .await;
2023        migration_tracks_invoked_correctly_driver(
2024            Stage::Shadow,
2025            Operation::Read,
2026            vec![Origin::Old, Origin::New],
2027        )
2028        .await;
2029        migration_tracks_invoked_correctly_driver(
2030            Stage::Live,
2031            Operation::Read,
2032            vec![Origin::Old, Origin::New],
2033        )
2034        .await;
2035        migration_tracks_invoked_correctly_driver(
2036            Stage::Rampdown,
2037            Operation::Read,
2038            vec![Origin::New],
2039        )
2040        .await;
2041        migration_tracks_invoked_correctly_driver(
2042            Stage::Complete,
2043            Operation::Read,
2044            vec![Origin::New],
2045        )
2046        .await;
2047        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
2048            .await;
2049        migration_tracks_invoked_correctly_driver(
2050            Stage::DualWrite,
2051            Operation::Write,
2052            vec![Origin::Old, Origin::New],
2053        )
2054        .await;
2055        migration_tracks_invoked_correctly_driver(
2056            Stage::Shadow,
2057            Operation::Write,
2058            vec![Origin::Old, Origin::New],
2059        )
2060        .await;
2061        migration_tracks_invoked_correctly_driver(
2062            Stage::Live,
2063            Operation::Write,
2064            vec![Origin::Old, Origin::New],
2065        )
2066        .await;
2067        migration_tracks_invoked_correctly_driver(
2068            Stage::Rampdown,
2069            Operation::Write,
2070            vec![Origin::Old, Origin::New],
2071        )
2072        .await;
2073        migration_tracks_invoked_correctly_driver(
2074            Stage::Complete,
2075            Operation::Write,
2076            vec![Origin::New],
2077        )
2078        .await;
2079    }
2080
2081    async fn migration_tracks_invoked_correctly_driver(
2082        stage: Stage,
2083        operation: Operation,
2084        origins: Vec<Origin>,
2085    ) {
2086        let (client, event_rx) = make_mocked_client();
2087        let client = Arc::new(client);
2088        client.start_with_default_executor();
2089        client
2090            .data_store
2091            .write()
2092            .upsert(
2093                "stage-flag",
2094                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2095            )
2096            .expect("patch should apply");
2097
2098        let mut migrator = MigratorBuilder::new(client.clone())
2099            .read(
2100                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2101                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2102                Some(|_, _| true),
2103            )
2104            .write(
2105                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2106                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2107            )
2108            .build()
2109            .expect("migrator should build");
2110
2111        let context = ContextBuilder::new("bob")
2112            .build()
2113            .expect("Failed to create context");
2114
2115        if let Operation::Read = operation {
2116            migrator
2117                .read(
2118                    &context,
2119                    "stage-flag".into(),
2120                    Stage::Off,
2121                    serde_json::Value::Null,
2122                )
2123                .await;
2124        } else {
2125            migrator
2126                .write(
2127                    &context,
2128                    "stage-flag".into(),
2129                    Stage::Off,
2130                    serde_json::Value::Null,
2131                )
2132                .await;
2133        }
2134
2135        client.flush();
2136        client.close();
2137
2138        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2139        assert_eq!(events.len(), 3);
2140        match &events[1] {
2141            OutputEvent::MigrationOp(event) => {
2142                assert!(event.invoked.len() == origins.len());
2143                assert!(event.invoked.iter().all(|i| origins.contains(i)));
2144            }
2145            _ => panic!("Expected migration event"),
2146        }
2147    }
2148
2149    #[tokio::test]
2150    async fn migration_tracks_latency() {
2151        migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2152        migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2153        migration_tracks_latency_driver(
2154            Stage::Shadow,
2155            Operation::Read,
2156            vec![Origin::Old, Origin::New],
2157        )
2158        .await;
2159        migration_tracks_latency_driver(
2160            Stage::Live,
2161            Operation::Read,
2162            vec![Origin::Old, Origin::New],
2163        )
2164        .await;
2165        migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2166        migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2167        migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2168        migration_tracks_latency_driver(
2169            Stage::DualWrite,
2170            Operation::Write,
2171            vec![Origin::Old, Origin::New],
2172        )
2173        .await;
2174        migration_tracks_latency_driver(
2175            Stage::Shadow,
2176            Operation::Write,
2177            vec![Origin::Old, Origin::New],
2178        )
2179        .await;
2180        migration_tracks_latency_driver(
2181            Stage::Live,
2182            Operation::Write,
2183            vec![Origin::Old, Origin::New],
2184        )
2185        .await;
2186        migration_tracks_latency_driver(
2187            Stage::Rampdown,
2188            Operation::Write,
2189            vec![Origin::Old, Origin::New],
2190        )
2191        .await;
2192        migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2193    }
2194
2195    async fn migration_tracks_latency_driver(
2196        stage: Stage,
2197        operation: Operation,
2198        origins: Vec<Origin>,
2199    ) {
2200        let (client, event_rx) = make_mocked_client();
2201        let client = Arc::new(client);
2202        client.start_with_default_executor();
2203        client
2204            .data_store
2205            .write()
2206            .upsert(
2207                "stage-flag",
2208                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2209            )
2210            .expect("patch should apply");
2211
2212        let mut migrator = MigratorBuilder::new(client.clone())
2213            .track_latency(true)
2214            .read(
2215                |_| {
2216                    async move {
2217                        async_std::task::sleep(Duration::from_millis(100)).await;
2218                        Ok(serde_json::Value::Null)
2219                    }
2220                    .boxed()
2221                },
2222                |_| {
2223                    async move {
2224                        async_std::task::sleep(Duration::from_millis(100)).await;
2225                        Ok(serde_json::Value::Null)
2226                    }
2227                    .boxed()
2228                },
2229                Some(|_, _| true),
2230            )
2231            .write(
2232                |_| {
2233                    async move {
2234                        async_std::task::sleep(Duration::from_millis(100)).await;
2235                        Ok(serde_json::Value::Null)
2236                    }
2237                    .boxed()
2238                },
2239                |_| {
2240                    async move {
2241                        async_std::task::sleep(Duration::from_millis(100)).await;
2242                        Ok(serde_json::Value::Null)
2243                    }
2244                    .boxed()
2245                },
2246            )
2247            .build()
2248            .expect("migrator should build");
2249
2250        let context = ContextBuilder::new("bob")
2251            .build()
2252            .expect("Failed to create context");
2253
2254        if let Operation::Read = operation {
2255            migrator
2256                .read(
2257                    &context,
2258                    "stage-flag".into(),
2259                    Stage::Off,
2260                    serde_json::Value::Null,
2261                )
2262                .await;
2263        } else {
2264            migrator
2265                .write(
2266                    &context,
2267                    "stage-flag".into(),
2268                    Stage::Off,
2269                    serde_json::Value::Null,
2270                )
2271                .await;
2272        }
2273
2274        client.flush();
2275        client.close();
2276
2277        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2278        assert_eq!(events.len(), 3);
2279        match &events[1] {
2280            OutputEvent::MigrationOp(event) => {
2281                assert!(event.latency.len() == origins.len());
2282                assert!(event
2283                    .latency
2284                    .values()
2285                    .all(|l| l > &Duration::from_millis(100)));
2286            }
2287            _ => panic!("Expected migration event"),
2288        }
2289    }
2290
2291    #[tokio::test]
2292    async fn migration_tracks_read_errors() {
2293        migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2294        migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2295        migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2296        migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2297        migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2298        migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2299    }
2300
2301    async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2302        let (client, event_rx) = make_mocked_client();
2303        let client = Arc::new(client);
2304        client.start_with_default_executor();
2305        client
2306            .data_store
2307            .write()
2308            .upsert(
2309                "stage-flag",
2310                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2311            )
2312            .expect("patch should apply");
2313
2314        let mut migrator = MigratorBuilder::new(client.clone())
2315            .track_latency(true)
2316            .read(
2317                |_| async move { Err("fail".into()) }.boxed(),
2318                |_| async move { Err("fail".into()) }.boxed(),
2319                Some(|_: &String, _: &String| true),
2320            )
2321            .write(
2322                |_| async move { Err("fail".into()) }.boxed(),
2323                |_| async move { Err("fail".into()) }.boxed(),
2324            )
2325            .build()
2326            .expect("migrator should build");
2327
2328        let context = ContextBuilder::new("bob")
2329            .build()
2330            .expect("Failed to create context");
2331
2332        migrator
2333            .read(
2334                &context,
2335                "stage-flag".into(),
2336                Stage::Off,
2337                serde_json::Value::Null,
2338            )
2339            .await;
2340        client.flush();
2341        client.close();
2342
2343        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2344        assert_eq!(events.len(), 3);
2345        match &events[1] {
2346            OutputEvent::MigrationOp(event) => {
2347                assert!(event.errors.len() == origins.len());
2348                assert!(event.errors.iter().all(|i| origins.contains(i)));
2349            }
2350            _ => panic!("Expected migration event"),
2351        }
2352    }
2353
2354    #[tokio::test]
2355    async fn migration_tracks_authoritative_write_errors() {
2356        migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2357        migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2358            .await;
2359        migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2360        migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2361        migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2362            .await;
2363        migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2364            .await;
2365    }
2366
2367    async fn migration_tracks_authoritative_write_errors_driver(
2368        stage: Stage,
2369        origins: Vec<Origin>,
2370    ) {
2371        let (client, event_rx) = make_mocked_client();
2372        let client = Arc::new(client);
2373        client.start_with_default_executor();
2374        client
2375            .data_store
2376            .write()
2377            .upsert(
2378                "stage-flag",
2379                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2380            )
2381            .expect("patch should apply");
2382
2383        let mut migrator = MigratorBuilder::new(client.clone())
2384            .track_latency(true)
2385            .read(
2386                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2387                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2388                None,
2389            )
2390            .write(
2391                |_| async move { Err("fail".into()) }.boxed(),
2392                |_| async move { Err("fail".into()) }.boxed(),
2393            )
2394            .build()
2395            .expect("migrator should build");
2396
2397        let context = ContextBuilder::new("bob")
2398            .build()
2399            .expect("Failed to create context");
2400
2401        migrator
2402            .write(
2403                &context,
2404                "stage-flag".into(),
2405                Stage::Off,
2406                serde_json::Value::Null,
2407            )
2408            .await;
2409
2410        client.flush();
2411        client.close();
2412
2413        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2414        assert_eq!(events.len(), 3);
2415        match &events[1] {
2416            OutputEvent::MigrationOp(event) => {
2417                assert!(event.errors.len() == origins.len());
2418                assert!(event.errors.iter().all(|i| origins.contains(i)));
2419            }
2420            _ => panic!("Expected migration event"),
2421        }
2422    }
2423
2424    #[tokio::test]
2425    async fn migration_tracks_nonauthoritative_write_errors() {
2426        migration_tracks_nonauthoritative_write_errors_driver(
2427            Stage::DualWrite,
2428            false,
2429            true,
2430            vec![Origin::New],
2431        )
2432        .await;
2433        migration_tracks_nonauthoritative_write_errors_driver(
2434            Stage::Shadow,
2435            false,
2436            true,
2437            vec![Origin::New],
2438        )
2439        .await;
2440        migration_tracks_nonauthoritative_write_errors_driver(
2441            Stage::Live,
2442            true,
2443            false,
2444            vec![Origin::Old],
2445        )
2446        .await;
2447        migration_tracks_nonauthoritative_write_errors_driver(
2448            Stage::Rampdown,
2449            true,
2450            false,
2451            vec![Origin::Old],
2452        )
2453        .await;
2454    }
2455
2456    async fn migration_tracks_nonauthoritative_write_errors_driver(
2457        stage: Stage,
2458        fail_old: bool,
2459        fail_new: bool,
2460        origins: Vec<Origin>,
2461    ) {
2462        let (client, event_rx) = make_mocked_client();
2463        let client = Arc::new(client);
2464        client.start_with_default_executor();
2465        client
2466            .data_store
2467            .write()
2468            .upsert(
2469                "stage-flag",
2470                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2471            )
2472            .expect("patch should apply");
2473
2474        let mut migrator = MigratorBuilder::new(client.clone())
2475            .track_latency(true)
2476            .read(
2477                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2478                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2479                None,
2480            )
2481            .write(
2482                move |_| {
2483                    async move {
2484                        if fail_old {
2485                            Err("fail".into())
2486                        } else {
2487                            Ok(serde_json::Value::Null)
2488                        }
2489                    }
2490                    .boxed()
2491                },
2492                move |_| {
2493                    async move {
2494                        if fail_new {
2495                            Err("fail".into())
2496                        } else {
2497                            Ok(serde_json::Value::Null)
2498                        }
2499                    }
2500                    .boxed()
2501                },
2502            )
2503            .build()
2504            .expect("migrator should build");
2505
2506        let context = ContextBuilder::new("bob")
2507            .build()
2508            .expect("Failed to create context");
2509
2510        migrator
2511            .write(
2512                &context,
2513                "stage-flag".into(),
2514                Stage::Off,
2515                serde_json::Value::Null,
2516            )
2517            .await;
2518
2519        client.flush();
2520        client.close();
2521
2522        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2523        assert_eq!(events.len(), 3);
2524        match &events[1] {
2525            OutputEvent::MigrationOp(event) => {
2526                assert!(event.errors.len() == origins.len());
2527                assert!(event.errors.iter().all(|i| origins.contains(i)));
2528            }
2529            _ => panic!("Expected migration event"),
2530        }
2531    }
2532
2533    #[tokio::test]
2534    async fn migration_tracks_consistency() {
2535        migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2536        migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2537        migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2538        migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2539    }
2540
2541    async fn migration_tracks_consistency_driver(
2542        stage: Stage,
2543        old_return: &'static str,
2544        new_return: &'static str,
2545        expected_consistency: bool,
2546    ) {
2547        let (client, event_rx) = make_mocked_client();
2548        let client = Arc::new(client);
2549        client.start_with_default_executor();
2550        client
2551            .data_store
2552            .write()
2553            .upsert(
2554                "stage-flag",
2555                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2556            )
2557            .expect("patch should apply");
2558
2559        let mut migrator = MigratorBuilder::new(client.clone())
2560            .track_latency(true)
2561            .read(
2562                |_| {
2563                    async move {
2564                        async_std::task::sleep(Duration::from_millis(100)).await;
2565                        Ok(serde_json::Value::String(old_return.to_string()))
2566                    }
2567                    .boxed()
2568                },
2569                |_| {
2570                    async move {
2571                        async_std::task::sleep(Duration::from_millis(100)).await;
2572                        Ok(serde_json::Value::String(new_return.to_string()))
2573                    }
2574                    .boxed()
2575                },
2576                Some(|lhs, rhs| lhs == rhs),
2577            )
2578            .write(
2579                |_| {
2580                    async move {
2581                        async_std::task::sleep(Duration::from_millis(100)).await;
2582                        Ok(serde_json::Value::Null)
2583                    }
2584                    .boxed()
2585                },
2586                |_| {
2587                    async move {
2588                        async_std::task::sleep(Duration::from_millis(100)).await;
2589                        Ok(serde_json::Value::Null)
2590                    }
2591                    .boxed()
2592                },
2593            )
2594            .build()
2595            .expect("migrator should build");
2596
2597        let context = ContextBuilder::new("bob")
2598            .build()
2599            .expect("Failed to create context");
2600
2601        migrator
2602            .read(
2603                &context,
2604                "stage-flag".into(),
2605                Stage::Off,
2606                serde_json::Value::Null,
2607            )
2608            .await;
2609
2610        client.flush();
2611        client.close();
2612
2613        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2614        assert_eq!(events.len(), 3);
2615        match &events[1] {
2616            OutputEvent::MigrationOp(event) => {
2617                assert!(event.consistency_check == Some(expected_consistency))
2618            }
2619            _ => panic!("Expected migration event"),
2620        }
2621    }
2622
2623    fn make_mocked_client_with_delay(
2624        delay: u64,
2625        offline: bool,
2626        daemon_mode: bool,
2627    ) -> (Client, Receiver<OutputEvent>) {
2628        let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2629        let (event_sender, event_rx) = create_event_sender();
2630
2631        let config = ConfigBuilder::new("sdk-key")
2632            .offline(offline)
2633            .daemon_mode(daemon_mode)
2634            .data_source(MockDataSourceBuilder::new().data_source(updates))
2635            .event_processor(
2636                EventProcessorBuilder::<HttpConnector>::new().event_sender(Arc::new(event_sender)),
2637            )
2638            .build()
2639            .expect("config should build");
2640
2641        let client = Client::build(config).expect("Should be built.");
2642
2643        (client, event_rx)
2644    }
2645
2646    fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2647        make_mocked_client_with_delay(0, true, false)
2648    }
2649
2650    fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2651        make_mocked_client_with_delay(0, false, false)
2652    }
2653}