Skip to main content

launchdarkly_server_sdk/
client.rs

1use eval::Context;
2use parking_lot::RwLock;
3use std::io;
4use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
5use std::sync::{Arc, Mutex};
6use std::time::Duration;
7use tokio::runtime::Runtime;
8
9use launchdarkly_server_sdk_evaluation::{self as eval, Detail, FlagValue, PrerequisiteEvent};
10use serde::Serialize;
11use thiserror::Error;
12use tokio::sync::{broadcast, Semaphore};
13
14use super::config::Config;
15use super::data_source::{DataSource, EventReceived};
16use super::data_source_builders::BuildError as DataSourceError;
17use super::evaluation::{FlagDetail, FlagDetailConfig};
18use super::stores::store::DataStore;
19use super::stores::store_builders::BuildError as DataStoreError;
20use crate::config::BuildError as ConfigBuildError;
21use crate::events::event::EventFactory;
22use crate::events::event::InputEvent;
23use crate::events::processor::EventProcessor;
24use crate::events::processor_builders::BuildError as EventProcessorError;
25use crate::{MigrationOpTracker, Stage};
26
27struct EventsScope {
28    disabled: bool,
29    event_factory: EventFactory,
30    prerequisite_event_recorder: Box<dyn eval::PrerequisiteEventRecorder + Send + Sync>,
31}
32
33struct PrerequisiteEventRecorder {
34    event_factory: EventFactory,
35    event_processor: Arc<dyn EventProcessor>,
36}
37
38impl eval::PrerequisiteEventRecorder for PrerequisiteEventRecorder {
39    fn record(&self, event: PrerequisiteEvent) {
40        let evt = self.event_factory.new_eval_event(
41            &event.prerequisite_flag.key,
42            event.context.clone(),
43            &event.prerequisite_flag,
44            event.prerequisite_result,
45            FlagValue::Json(serde_json::Value::Null),
46            Some(event.target_flag_key),
47        );
48
49        self.event_processor.send(evt);
50    }
51}
52
53/// Error type used to represent failures when building a [Client] instance.
54#[non_exhaustive]
55#[derive(Debug, Error)]
56pub enum BuildError {
57    /// Error used when a configuration setting is invalid. This typically indicates an invalid URL.
58    #[error("invalid client config: {0}")]
59    InvalidConfig(String),
60}
61
62impl From<DataSourceError> for BuildError {
63    fn from(error: DataSourceError) -> Self {
64        Self::InvalidConfig(error.to_string())
65    }
66}
67
68impl From<DataStoreError> for BuildError {
69    fn from(error: DataStoreError) -> Self {
70        Self::InvalidConfig(error.to_string())
71    }
72}
73
74impl From<EventProcessorError> for BuildError {
75    fn from(error: EventProcessorError) -> Self {
76        Self::InvalidConfig(error.to_string())
77    }
78}
79
80impl From<ConfigBuildError> for BuildError {
81    fn from(error: ConfigBuildError) -> Self {
82        Self::InvalidConfig(error.to_string())
83    }
84}
85
86/// Error type used to represent failures when starting the [Client].
87#[non_exhaustive]
88#[derive(Debug, Error)]
89pub enum StartError {
90    /// Error used when spawning a background there fails.
91    #[error("couldn't spawn background thread for client: {0}")]
92    SpawnFailed(io::Error),
93}
94
95#[derive(PartialEq, Copy, Clone, Debug)]
96enum ClientInitState {
97    Initializing = 0,
98    Initialized = 1,
99    InitializationFailed = 2,
100}
101
102impl PartialEq<usize> for ClientInitState {
103    fn eq(&self, other: &usize) -> bool {
104        *self as usize == *other
105    }
106}
107
108impl From<usize> for ClientInitState {
109    fn from(val: usize) -> Self {
110        match val {
111            0 => ClientInitState::Initializing,
112            1 => ClientInitState::Initialized,
113            2 => ClientInitState::InitializationFailed,
114            _ => unreachable!(),
115        }
116    }
117}
118
119/// A client for the LaunchDarkly API.
120///
121/// In order to create a client instance, first create a config using [crate::ConfigBuilder].
122///
123/// # Examples
124///
125/// Creating a client, with default configuration.
126/// ```
127/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, BuildError};
128/// # fn main() -> Result<(), BuildError> {
129///     let ld_client = Client::build(ConfigBuilder::new("sdk-key").build()?)?;
130/// #   Ok(())
131/// # }
132/// ```
133///
134/// Creating an instance which connects to a relay proxy.
135/// ```
136/// # use launchdarkly_server_sdk::{Client, ConfigBuilder, ServiceEndpointsBuilder, BuildError};
137/// # fn main() -> Result<(), BuildError> {
138///     let ld_client = Client::build(ConfigBuilder::new("sdk-key")
139///         .service_endpoints(ServiceEndpointsBuilder::new()
140///             .relay_proxy("http://my-relay-hostname:8080")
141///         ).build()?
142///     )?;
143/// #   Ok(())
144/// # }
145/// ```
146///
147/// Each builder type includes usage examples for the builder.
148pub struct Client {
149    event_processor: Arc<dyn EventProcessor>,
150    data_source: Arc<dyn DataSource>,
151    data_store: Arc<RwLock<dyn DataStore>>,
152    events_default: EventsScope,
153    events_with_reasons: EventsScope,
154    init_notify: Arc<Semaphore>,
155    init_state: Arc<AtomicUsize>,
156    started: AtomicBool,
157    offline: bool,
158    daemon_mode: bool,
159    sdk_key: String,
160    shutdown_broadcast: broadcast::Sender<()>,
161    runtime: RwLock<Option<Runtime>>,
162}
163
164impl Client {
165    /// Create a new instance of a [Client] based on the provided [Config] parameter.
166    pub fn build(config: Config) -> Result<Self, BuildError> {
167        if config.offline() {
168            info!("Started LaunchDarkly Client in offline mode");
169        } else if config.daemon_mode() {
170            info!("Started LaunchDarkly Client in daemon mode");
171        }
172
173        let tags = config.application_tag();
174
175        let endpoints = config.service_endpoints_builder().build()?;
176        let event_processor =
177            config
178                .event_processor_builder()
179                .build(&endpoints, config.sdk_key(), tags.clone())?;
180        let data_source =
181            config
182                .data_source_builder()
183                .build(&endpoints, config.sdk_key(), tags.clone())?;
184        let data_store = config.data_store_builder().build()?;
185
186        let events_default = EventsScope {
187            disabled: config.offline(),
188            event_factory: EventFactory::new(false),
189            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
190                event_factory: EventFactory::new(false),
191                event_processor: event_processor.clone(),
192            }),
193        };
194
195        let events_with_reasons = EventsScope {
196            disabled: config.offline(),
197            event_factory: EventFactory::new(true),
198            prerequisite_event_recorder: Box::new(PrerequisiteEventRecorder {
199                event_factory: EventFactory::new(true),
200                event_processor: event_processor.clone(),
201            }),
202        };
203
204        let (shutdown_tx, _) = broadcast::channel(1);
205
206        Ok(Client {
207            event_processor,
208            data_source,
209            data_store,
210            events_default,
211            events_with_reasons,
212            init_notify: Arc::new(Semaphore::new(0)),
213            init_state: Arc::new(AtomicUsize::new(ClientInitState::Initializing as usize)),
214            started: AtomicBool::new(false),
215            offline: config.offline(),
216            daemon_mode: config.daemon_mode(),
217            sdk_key: config.sdk_key().into(),
218            shutdown_broadcast: shutdown_tx,
219            runtime: RwLock::new(None),
220        })
221    }
222
223    /// Starts a client in the current thread, which must have a default tokio
224    /// runtime. This variant accepts a callback for tracking the time of the
225    /// last processed server event.
226    pub fn start_with_default_executor_and_callback(&self, event_received: EventReceived) {
227        if self.started.load(Ordering::SeqCst) {
228            return;
229        }
230        self.started.store(true, Ordering::SeqCst);
231        self.start_with_default_executor_internal(event_received);
232    }
233
234    /// Starts a client in the current thread, which must have a default tokio runtime.
235    pub fn start_with_default_executor(&self) {
236        if self.started.load(Ordering::SeqCst) {
237            return;
238        }
239        self.started.store(true, Ordering::SeqCst);
240        self.start_with_default_executor_internal(Arc::new(move |_ev| {}));
241    }
242
243    fn start_with_default_executor_internal(&self, event_received: EventReceived) {
244        // These clones are going to move into the closure, we
245        // do not want to move or reference `self`, because
246        // then lifetimes will get involved.
247        let notify = self.init_notify.clone();
248        let init_state = self.init_state.clone();
249
250        self.data_source.subscribe(
251            self.data_store.clone(),
252            Arc::new(move |success| {
253                init_state.store(
254                    (if success {
255                        ClientInitState::Initialized
256                    } else {
257                        ClientInitState::InitializationFailed
258                    }) as usize,
259                    Ordering::SeqCst,
260                );
261                notify.add_permits(1);
262            }),
263            event_received,
264            self.shutdown_broadcast.subscribe(),
265        );
266    }
267
268    /// Creates a new tokio runtime and then starts the client. Tasks from the client will
269    /// be executed on created runtime.
270    /// If your application already has a tokio runtime, then you can use
271    /// [crate::Client::start_with_default_executor] and the client will dispatch tasks to
272    /// your existing runtime.
273    pub fn start_with_runtime(&self) -> Result<bool, StartError> {
274        if self.started.load(Ordering::SeqCst) {
275            return Ok(true);
276        }
277        self.started.store(true, Ordering::SeqCst);
278
279        let runtime = Runtime::new().map_err(StartError::SpawnFailed)?;
280        let _guard = runtime.enter();
281        self.runtime.write().replace(runtime);
282
283        self.start_with_default_executor_internal(Arc::new(move |_ev| {}));
284
285        Ok(true)
286    }
287
288    /// This is an async method that will resolve once initialization is complete.
289    /// Initialization being complete does not mean that initialization was a success.
290    /// The return value from the method indicates if the client successfully initialized.
291    #[deprecated(
292        note = "blocking without a timeout is discouraged, use wait_for_initialization instead"
293    )]
294    pub async fn initialized_async(&self) -> bool {
295        self.initialized_async_internal().await
296    }
297
298    /// This is an async method that will resolve once initialization is complete or the specified
299    /// timeout has occurred.
300    ///
301    /// If the timeout is triggered, this method will return `None`. Otherwise, the method will
302    /// return a boolean indicating whether or not the SDK has successfully initialized.
303    pub async fn wait_for_initialization(&self, timeout: Duration) -> Option<bool> {
304        if timeout > Duration::from_secs(60) {
305            warn!("wait_for_initialization was configured to block for up to {} seconds. We recommend blocking no longer than 60 seconds.", timeout.as_secs());
306        }
307
308        let initialized = tokio::time::timeout(timeout, self.initialized_async_internal()).await;
309        initialized.ok()
310    }
311
312    async fn initialized_async_internal(&self) -> bool {
313        if self.offline || self.daemon_mode {
314            return true;
315        }
316
317        // If the client is not initialized, then we need to wait for it to be initialized.
318        // Because we are using atomic types, and not a lock, then there is still the possibility
319        // that the value will change between the read and when we wait. We use a semaphore to wait,
320        // and we do not forget the permit, therefore if the permit has been added, then we will get
321        // it very quickly and reduce blocking.
322        if ClientInitState::Initialized != self.init_state.load(Ordering::SeqCst) {
323            let _permit = self.init_notify.acquire().await;
324        }
325        ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
326    }
327
328    /// This function synchronously returns if the SDK is initialized.
329    /// In the case of unrecoverable errors in establishing a connection it is possible for the
330    /// SDK to never become initialized.
331    pub fn initialized(&self) -> bool {
332        self.offline
333            || self.daemon_mode
334            || ClientInitState::Initialized == self.init_state.load(Ordering::SeqCst)
335    }
336
337    /// Close shuts down the LaunchDarkly client. After calling this, the LaunchDarkly client
338    /// should no longer be used. The method will block until all pending analytics events (if any)
339    /// been sent.
340    pub fn close(&self) {
341        self.event_processor.close();
342
343        // If the system is in offline mode or daemon mode, no receiver will be listening to this
344        // broadcast channel, so sending on it would always result in an error.
345        if !self.offline && !self.daemon_mode {
346            if let Err(e) = self.shutdown_broadcast.send(()) {
347                error!("Failed to shutdown client appropriately: {}", e);
348            }
349        }
350
351        // Potentially take the runtime we created when starting the client and do nothing with it
352        // so it drops, closing out all spawned tasks.
353        self.runtime.write().take();
354    }
355
356    /// Flush tells the client that all pending analytics events (if any) should be delivered as
357    /// soon as possible. Flushing is asynchronous, so this method will return before it is
358    /// complete. However, if you call [Client::close], events are guaranteed to be sent before
359    /// that method returns.
360    ///
361    /// For more information, see the Reference Guide:
362    /// <https://docs.launchdarkly.com/sdk/features/flush#rust>.
363    pub fn flush(&self) {
364        self.event_processor.flush();
365    }
366
367    /// Identify reports details about a context.
368    ///
369    /// For more information, see the Reference Guide:
370    /// <https://docs.launchdarkly.com/sdk/features/identify#rust>
371    pub fn identify(&self, context: Context) {
372        if self.events_default.disabled {
373            return;
374        }
375
376        self.send_internal(self.events_default.event_factory.new_identify(context));
377    }
378
379    /// Returns the value of a boolean feature flag for a given context.
380    ///
381    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
382    /// off and has no off variation.
383    ///
384    /// For more information, see the Reference Guide:
385    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
386    pub fn bool_variation(&self, context: &Context, flag_key: &str, default: bool) -> bool {
387        let val = self.variation(context, flag_key, default);
388        if let Some(b) = val.as_bool() {
389            b
390        } else {
391            warn!(
392                "bool_variation called for a non-bool flag {:?} (got {:?})",
393                flag_key, val
394            );
395            default
396        }
397    }
398
399    /// Returns the value of a string feature flag for a given context.
400    ///
401    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
402    /// off and has no off variation.
403    ///
404    /// For more information, see the Reference Guide:
405    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
406    pub fn str_variation(&self, context: &Context, flag_key: &str, default: String) -> String {
407        let val = self.variation(context, flag_key, default.clone());
408        if let Some(s) = val.as_string() {
409            s
410        } else {
411            warn!(
412                "str_variation called for a non-string flag {:?} (got {:?})",
413                flag_key, val
414            );
415            default
416        }
417    }
418
419    /// Returns the value of a float feature flag for a given context.
420    ///
421    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
422    /// off and has no off variation.
423    ///
424    /// For more information, see the Reference Guide:
425    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
426    pub fn float_variation(&self, context: &Context, flag_key: &str, default: f64) -> f64 {
427        let val = self.variation(context, flag_key, default);
428        if let Some(f) = val.as_float() {
429            f
430        } else {
431            warn!(
432                "float_variation called for a non-float flag {:?} (got {:?})",
433                flag_key, val
434            );
435            default
436        }
437    }
438
439    /// Returns the value of a integer feature flag for a given context.
440    ///
441    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
442    /// off and has no off variation.
443    ///
444    /// For more information, see the Reference Guide:
445    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
446    pub fn int_variation(&self, context: &Context, flag_key: &str, default: i64) -> i64 {
447        let val = self.variation(context, flag_key, default);
448        if let Some(f) = val.as_int() {
449            f
450        } else {
451            warn!(
452                "int_variation called for a non-int flag {:?} (got {:?})",
453                flag_key, val
454            );
455            default
456        }
457    }
458
459    /// Returns the value of a feature flag for the given context, allowing the value to be
460    /// of any JSON type.
461    ///
462    /// The value is returned as an [serde_json::Value].
463    ///
464    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned off.
465    ///
466    /// For more information, see the Reference Guide:
467    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
468    pub fn json_variation(
469        &self,
470        context: &Context,
471        flag_key: &str,
472        default: serde_json::Value,
473    ) -> serde_json::Value {
474        self.variation(context, flag_key, default.clone())
475            .as_json()
476            .unwrap_or(default)
477    }
478
479    /// This method is the same as [Client::bool_variation], but also returns further information
480    /// about how the value was calculated. The "reason" data will also be included in analytics
481    /// events.
482    ///
483    /// For more information, see the Reference Guide:
484    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
485    pub fn bool_variation_detail(
486        &self,
487        context: &Context,
488        flag_key: &str,
489        default: bool,
490    ) -> Detail<bool> {
491        self.variation_detail(context, flag_key, default).try_map(
492            |val| val.as_bool(),
493            default,
494            eval::Error::WrongType,
495        )
496    }
497
498    /// This method is the same as [Client::str_variation], but also returns further information
499    /// about how the value was calculated. The "reason" data will also be included in analytics
500    /// events.
501    ///
502    /// For more information, see the Reference Guide:
503    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
504    pub fn str_variation_detail(
505        &self,
506        context: &Context,
507        flag_key: &str,
508        default: String,
509    ) -> Detail<String> {
510        self.variation_detail(context, flag_key, default.clone())
511            .try_map(|val| val.as_string(), default, eval::Error::WrongType)
512    }
513
514    /// This method is the same as [Client::float_variation], but also returns further information
515    /// about how the value was calculated. The "reason" data will also be included in analytics
516    /// events.
517    ///
518    /// For more information, see the Reference Guide:
519    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
520    pub fn float_variation_detail(
521        &self,
522        context: &Context,
523        flag_key: &str,
524        default: f64,
525    ) -> Detail<f64> {
526        self.variation_detail(context, flag_key, default).try_map(
527            |val| val.as_float(),
528            default,
529            eval::Error::WrongType,
530        )
531    }
532
533    /// This method is the same as [Client::int_variation], but also returns further information
534    /// about how the value was calculated. The "reason" data will also be included in analytics
535    /// events.
536    ///
537    /// For more information, see the Reference Guide:
538    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
539    pub fn int_variation_detail(
540        &self,
541        context: &Context,
542        flag_key: &str,
543        default: i64,
544    ) -> Detail<i64> {
545        self.variation_detail(context, flag_key, default).try_map(
546            |val| val.as_int(),
547            default,
548            eval::Error::WrongType,
549        )
550    }
551
552    /// This method is the same as [Client::json_variation], but also returns further information
553    /// about how the value was calculated. The "reason" data will also be included in analytics
554    /// events.
555    ///
556    /// For more information, see the Reference Guide:
557    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
558    pub fn json_variation_detail(
559        &self,
560        context: &Context,
561        flag_key: &str,
562        default: serde_json::Value,
563    ) -> Detail<serde_json::Value> {
564        self.variation_detail(context, flag_key, default.clone())
565            .try_map(|val| val.as_json(), default, eval::Error::WrongType)
566    }
567
568    /// Generates the secure mode hash value for a context.
569    ///
570    /// For more information, see the Reference Guide:
571    /// <https://docs.launchdarkly.com/sdk/features/secure-mode#rust>.
572    pub fn secure_mode_hash(&self, context: &Context) -> String {
573        let key = aws_lc_rs::hmac::Key::new(aws_lc_rs::hmac::HMAC_SHA256, self.sdk_key.as_bytes());
574        let tag = aws_lc_rs::hmac::sign(&key, context.canonical_key().as_bytes());
575
576        data_encoding::HEXLOWER.encode(tag.as_ref())
577    }
578
579    /// Returns an object that encapsulates the state of all feature flags for a given context. This
580    /// includes the flag values, and also metadata that can be used on the front end.
581    ///
582    /// The most common use case for this method is to bootstrap a set of client-side feature flags
583    /// from a back-end service.
584    ///
585    /// You may pass any configuration of [FlagDetailConfig] to control what data is included.
586    ///
587    /// For more information, see the Reference Guide:
588    /// <https://docs.launchdarkly.com/sdk/features/all-flags#rust>
589    pub fn all_flags_detail(
590        &self,
591        context: &Context,
592        flag_state_config: FlagDetailConfig,
593    ) -> FlagDetail {
594        if self.offline {
595            warn!(
596                "all_flags_detail() called, but client is in offline mode. Returning empty state"
597            );
598            return FlagDetail::new(false);
599        }
600
601        if !self.initialized() {
602            warn!("all_flags_detail() called before client has finished initializing! Feature store unavailable - returning empty state");
603            return FlagDetail::new(false);
604        }
605
606        let data_store = self.data_store.read();
607
608        let mut flag_detail = FlagDetail::new(true);
609        flag_detail.populate(&*data_store, context, flag_state_config);
610
611        flag_detail
612    }
613
614    /// This method is the same as [Client::variation], but also returns further information about
615    /// how the value was calculated. The "reason" data will also be included in analytics events.
616    ///
617    /// For more information, see the Reference Guide:
618    /// <https://docs.launchdarkly.com/sdk/features/evaluation-reasons#rust>.
619    pub fn variation_detail<T: Into<FlagValue> + Clone>(
620        &self,
621        context: &Context,
622        flag_key: &str,
623        default: T,
624    ) -> Detail<FlagValue> {
625        let (detail, _) =
626            self.variation_internal(context, flag_key, default, &self.events_with_reasons);
627        detail
628    }
629
630    /// This is a generic function which returns the value of a feature flag for a given context.
631    ///
632    /// This method is an alternatively to the type specified methods (e.g.
633    /// [Client::bool_variation], [Client::int_variation], etc.).
634    ///
635    /// Returns `default` if there is an error, if the flag doesn't exist, or the feature is turned
636    /// off and has no off variation.
637    ///
638    /// For more information, see the Reference Guide:
639    /// <https://docs.launchdarkly.com/sdk/features/evaluating#rust>.
640    pub fn variation<T: Into<FlagValue> + Clone>(
641        &self,
642        context: &Context,
643        flag_key: &str,
644        default: T,
645    ) -> FlagValue {
646        let (detail, _) = self.variation_internal(context, flag_key, default, &self.events_default);
647        detail.value.unwrap()
648    }
649
650    /// This method returns the migration stage of the migration feature flag for the given
651    /// evaluation context.
652    ///
653    /// This method returns the default stage if there is an error or the flag does not exist.
654    pub fn migration_variation(
655        &self,
656        context: &Context,
657        flag_key: &str,
658        default_stage: Stage,
659    ) -> (Stage, Arc<Mutex<MigrationOpTracker>>) {
660        let (detail, flag) =
661            self.variation_internal(context, flag_key, default_stage, &self.events_default);
662
663        let migration_detail =
664            detail.try_map(|v| v.try_into().ok(), default_stage, eval::Error::WrongType);
665        let tracker = MigrationOpTracker::new(
666            flag_key.into(),
667            flag,
668            context.clone(),
669            migration_detail.clone(),
670            default_stage,
671        );
672
673        (
674            migration_detail.value.unwrap_or(default_stage),
675            Arc::new(Mutex::new(tracker)),
676        )
677    }
678
679    /// Reports that a context has performed an event.
680    ///
681    /// The `key` parameter is defined by the application and will be shown in analytics reports;
682    /// it normally corresponds to the event name of a metric that you have created through the
683    /// LaunchDarkly dashboard. If you want to associate additional data with this event, use
684    /// [Client::track_data] or [Client::track_metric].
685    ///
686    /// For more information, see the Reference Guide:
687    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
688    pub fn track_event(&self, context: Context, key: impl Into<String>) {
689        let _ = self.track(context, key, None, serde_json::Value::Null);
690    }
691
692    /// Reports that a context has performed an event, and associates it with custom data.
693    ///
694    /// The `key` parameter is defined by the application and will be shown in analytics reports;
695    /// it normally corresponds to the event name of a metric that you have created through the
696    /// LaunchDarkly dashboard.
697    ///
698    /// `data` parameter is any type that implements [Serialize]. If no such value is needed, use
699    /// [serde_json::Value::Null] (or call [Client::track_event] instead). To send a numeric value
700    /// for experimentation, use [Client::track_metric].
701    ///
702    /// For more information, see the Reference Guide:
703    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
704    pub fn track_data(
705        &self,
706        context: Context,
707        key: impl Into<String>,
708        data: impl Serialize,
709    ) -> serde_json::Result<()> {
710        self.track(context, key, None, data)
711    }
712
713    /// Reports that a context has performed an event, and associates it with a numeric value. This
714    /// value is used by the LaunchDarkly experimentation feature in numeric custom metrics, and
715    /// will also be returned as part of the custom event for Data Export.
716    ///
717    /// The `key` parameter is defined by the application and will be shown in analytics reports;
718    /// it normally corresponds to the event name of a metric that you have created through the
719    /// LaunchDarkly dashboard.
720    ///
721    /// For more information, see the Reference Guide:
722    /// <https://docs.launchdarkly.com/sdk/features/events#rust>.
723    pub fn track_metric(
724        &self,
725        context: Context,
726        key: impl Into<String>,
727        value: f64,
728        data: impl Serialize,
729    ) {
730        let _ = self.track(context, key, Some(value), data);
731    }
732
733    fn track(
734        &self,
735        context: Context,
736        key: impl Into<String>,
737        metric_value: Option<f64>,
738        data: impl Serialize,
739    ) -> serde_json::Result<()> {
740        if !self.events_default.disabled {
741            let event =
742                self.events_default
743                    .event_factory
744                    .new_custom(context, key, metric_value, data)?;
745
746            self.send_internal(event);
747        }
748
749        Ok(())
750    }
751
752    /// Tracks the results of a migrations operation. This event includes measurements which can be
753    /// used to enhance the observability of a migration within the LaunchDarkly UI.
754    ///
755    /// This event should be generated through [crate::MigrationOpTracker]. If you are using the
756    /// [crate::Migrator] to handle migrations, this event will be created and emitted
757    /// automatically.
758    pub fn track_migration_op(&self, tracker: Arc<Mutex<MigrationOpTracker>>) {
759        if self.events_default.disabled {
760            return;
761        }
762
763        match tracker.lock() {
764            Ok(tracker) => {
765                let event = tracker.build();
766                match event {
767                    Ok(event) => {
768                        self.send_internal(
769                            self.events_default.event_factory.new_migration_op(event),
770                        );
771                    }
772                    Err(e) => error!(
773                        "Failed to build migration event, no event will be sent: {}",
774                        e
775                    ),
776                }
777            }
778            Err(e) => error!(
779                "Failed to lock migration tracker, no event will be sent: {}",
780                e
781            ),
782        }
783    }
784
785    fn variation_internal<T: Into<FlagValue> + Clone>(
786        &self,
787        context: &Context,
788        flag_key: &str,
789        default: T,
790        events_scope: &EventsScope,
791    ) -> (Detail<FlagValue>, Option<eval::Flag>) {
792        if self.offline {
793            return (
794                Detail::err_default(eval::Error::ClientNotReady, default.into()),
795                None,
796            );
797        }
798
799        let (flag, result) = match self.initialized() {
800            false => (
801                None,
802                Detail::err_default(eval::Error::ClientNotReady, default.clone().into()),
803            ),
804            true => {
805                let data_store = self.data_store.read();
806                match data_store.flag(flag_key) {
807                    Some(flag) => {
808                        let result = eval::evaluate(
809                            data_store.to_store(),
810                            &flag,
811                            context,
812                            Some(&*events_scope.prerequisite_event_recorder),
813                        )
814                        .map(|v| v.clone())
815                        .or(default.clone().into());
816
817                        (Some(flag), result)
818                    }
819                    None => (
820                        None,
821                        Detail::err_default(eval::Error::FlagNotFound, default.clone().into()),
822                    ),
823                }
824            }
825        };
826
827        if !events_scope.disabled {
828            let event = match &flag {
829                Some(f) => events_scope.event_factory.new_eval_event(
830                    flag_key,
831                    context.clone(),
832                    f,
833                    result.clone(),
834                    default.into(),
835                    None,
836                ),
837                None => events_scope.event_factory.new_unknown_flag_event(
838                    flag_key,
839                    context.clone(),
840                    result.clone(),
841                    default.into(),
842                ),
843            };
844            self.send_internal(event);
845        }
846
847        (result, flag)
848    }
849
850    fn send_internal(&self, event: InputEvent) {
851        self.event_processor.send(event);
852    }
853}
854
855#[cfg(test)]
856mod tests {
857    use assert_json_diff::assert_json_eq;
858    use crossbeam_channel::Receiver;
859    use eval::{ContextBuilder, MultiContextBuilder};
860    use futures::FutureExt;
861    use hyper::client::HttpConnector;
862    use launchdarkly_server_sdk_evaluation::{Flag, Reason, Segment};
863    use maplit::hashmap;
864    use std::collections::HashMap;
865    use tokio::time::Instant;
866
867    use crate::data_source::MockDataSource;
868    use crate::data_source_builders::MockDataSourceBuilder;
869    use crate::events::create_event_sender;
870    use crate::events::event::{OutputEvent, VariationKey};
871    use crate::events::processor_builders::EventProcessorBuilder;
872    use crate::stores::persistent_store::tests::InMemoryPersistentDataStore;
873    use crate::stores::store_types::{PatchTarget, StorageItem};
874    use crate::test_common::{
875        self, basic_flag, basic_flag_with_prereq, basic_flag_with_prereqs_and_visibility,
876        basic_flag_with_visibility, basic_int_flag, basic_migration_flag, basic_off_flag,
877    };
878    use crate::{
879        AllData, ConfigBuilder, MigratorBuilder, NullEventProcessorBuilder, Operation, Origin,
880        PersistentDataStore, PersistentDataStoreBuilder, PersistentDataStoreFactory,
881        SerializedItem,
882    };
883    use test_case::test_case;
884
885    use super::*;
886
887    fn is_send_and_sync<T: Send + Sync>() {}
888
889    #[test]
890    fn ensure_client_is_send_and_sync() {
891        is_send_and_sync::<Client>()
892    }
893
894    #[tokio::test]
895    async fn client_asynchronously_initializes() {
896        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
897        client.start_with_default_executor();
898
899        let now = Instant::now();
900        let initialized = client.initialized_async().await;
901        let elapsed_time = now.elapsed();
902        assert!(initialized);
903        // Give ourself a good margin for thread scheduling.
904        assert!(elapsed_time.as_millis() > 500)
905    }
906
907    #[tokio::test]
908    async fn client_asynchronously_initializes_within_timeout() {
909        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, false);
910        client.start_with_default_executor();
911
912        let now = Instant::now();
913        let initialized = client
914            .wait_for_initialization(Duration::from_millis(1500))
915            .await;
916        let elapsed_time = now.elapsed();
917        // Give ourself a good margin for thread scheduling.
918        assert!(elapsed_time.as_millis() > 500);
919        assert_eq!(initialized, Some(true));
920    }
921
922    #[tokio::test]
923    async fn client_asynchronously_initializes_slower_than_timeout() {
924        let (client, _event_rx) = make_mocked_client_with_delay(2000, false, false);
925        client.start_with_default_executor();
926
927        let now = Instant::now();
928        let initialized = client
929            .wait_for_initialization(Duration::from_millis(500))
930            .await;
931        let elapsed_time = now.elapsed();
932        // Give ourself a good margin for thread scheduling.
933        assert!(elapsed_time.as_millis() < 750);
934        assert!(initialized.is_none());
935    }
936
937    #[tokio::test]
938    async fn client_initializes_immediately_in_offline_mode() {
939        let (client, _event_rx) = make_mocked_client_with_delay(1000, true, false);
940        client.start_with_default_executor();
941
942        assert!(client.initialized());
943
944        let now = Instant::now();
945        let initialized = client
946            .wait_for_initialization(Duration::from_millis(2000))
947            .await;
948        let elapsed_time = now.elapsed();
949        assert_eq!(initialized, Some(true));
950        assert!(elapsed_time.as_millis() < 500)
951    }
952
953    #[tokio::test]
954    async fn client_initializes_immediately_in_daemon_mode() {
955        let (client, _event_rx) = make_mocked_client_with_delay(1000, false, true);
956        client.start_with_default_executor();
957
958        assert!(client.initialized());
959
960        let now = Instant::now();
961        let initialized = client
962            .wait_for_initialization(Duration::from_millis(2000))
963            .await;
964        let elapsed_time = now.elapsed();
965        assert_eq!(initialized, Some(true));
966        assert!(elapsed_time.as_millis() < 500)
967    }
968
969    #[test_case(basic_flag("myFlag"), false.into(), true.into())]
970    #[test_case(basic_int_flag("myFlag"), 0.into(), test_common::FLOAT_TO_INT_MAX.into())]
971    fn client_updates_changes_evaluation_results(
972        flag: eval::Flag,
973        default: FlagValue,
974        expected: FlagValue,
975    ) {
976        let context = ContextBuilder::new("foo")
977            .build()
978            .expect("Failed to create context");
979
980        let (client, _event_rx) = make_mocked_client();
981
982        let result = client.variation_detail(&context, "myFlag", default.clone());
983        assert_eq!(result.value.unwrap(), default);
984
985        client.start_with_default_executor();
986        client
987            .data_store
988            .write()
989            .upsert(
990                &flag.key,
991                PatchTarget::Flag(StorageItem::Item(flag.clone())),
992            )
993            .expect("patch should apply");
994
995        let result = client.variation_detail(&context, "myFlag", default);
996        assert_eq!(result.value.unwrap(), expected);
997        assert!(matches!(
998            result.reason,
999            Reason::Fallthrough {
1000                in_experiment: false
1001            }
1002        ));
1003    }
1004
1005    #[test]
1006    fn all_flags_detail_is_invalid_when_offline() {
1007        let (client, _event_rx) = make_mocked_offline_client();
1008        client.start_with_default_executor();
1009
1010        let context = ContextBuilder::new("bob")
1011            .build()
1012            .expect("Failed to create context");
1013
1014        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1015        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1016    }
1017
1018    #[test]
1019    fn all_flags_detail_is_invalid_when_not_initialized() {
1020        let (client, _event_rx) = make_mocked_client();
1021
1022        let context = ContextBuilder::new("bob")
1023            .build()
1024            .expect("Failed to create context");
1025
1026        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1027        assert_json_eq!(all_flags, json!({"$valid": false, "$flagsState" : {}}));
1028    }
1029
1030    #[test]
1031    fn all_flags_detail_returns_flag_states() {
1032        let (client, _event_rx) = make_mocked_client();
1033        client.start_with_default_executor();
1034        client
1035            .data_store
1036            .write()
1037            .upsert(
1038                "myFlag1",
1039                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag1"))),
1040            )
1041            .expect("patch should apply");
1042        client
1043            .data_store
1044            .write()
1045            .upsert(
1046                "myFlag2",
1047                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag2"))),
1048            )
1049            .expect("patch should apply");
1050        let context = ContextBuilder::new("bob")
1051            .build()
1052            .expect("Failed to create context");
1053
1054        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1055
1056        client.close();
1057
1058        assert_json_eq!(
1059            all_flags,
1060            json!({
1061                "myFlag1": true,
1062                "myFlag2": true,
1063                "$flagsState": {
1064                    "myFlag1": {
1065                        "version": 42,
1066                        "variation": 1
1067                    },
1068                     "myFlag2": {
1069                        "version": 42,
1070                        "variation": 1
1071                    },
1072                },
1073                "$valid": true
1074            })
1075        );
1076    }
1077
1078    #[test]
1079    fn all_flags_detail_returns_prerequisite_relations() {
1080        let (client, _event_rx) = make_mocked_client();
1081        client.start_with_default_executor();
1082        client
1083            .data_store
1084            .write()
1085            .upsert(
1086                "prereq1",
1087                PatchTarget::Flag(StorageItem::Item(basic_flag("prereq1"))),
1088            )
1089            .expect("patch should apply");
1090        client
1091            .data_store
1092            .write()
1093            .upsert(
1094                "prereq2",
1095                PatchTarget::Flag(StorageItem::Item(basic_flag("prereq2"))),
1096            )
1097            .expect("patch should apply");
1098
1099        client
1100            .data_store
1101            .write()
1102            .upsert(
1103                "toplevel",
1104                PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1105                    "toplevel",
1106                    &["prereq1", "prereq2"],
1107                    false,
1108                ))),
1109            )
1110            .expect("patch should apply");
1111
1112        let context = ContextBuilder::new("bob")
1113            .build()
1114            .expect("Failed to create context");
1115
1116        let all_flags = client.all_flags_detail(&context, FlagDetailConfig::new());
1117
1118        client.close();
1119
1120        assert_json_eq!(
1121            all_flags,
1122            json!({
1123                "prereq1": true,
1124                "prereq2": true,
1125                "toplevel": true,
1126                "$flagsState": {
1127                    "toplevel": {
1128                        "version": 42,
1129                        "variation": 1,
1130                        "prerequisites": ["prereq1", "prereq2"]
1131                    },
1132                    "prereq1": {
1133                        "version": 42,
1134                        "variation": 1
1135                    },
1136                     "prereq2": {
1137                        "version": 42,
1138                        "variation": 1
1139                    },
1140                },
1141                "$valid": true
1142            })
1143        );
1144    }
1145
1146    #[test]
1147    fn all_flags_detail_returns_prerequisite_relations_when_not_visible_to_clients() {
1148        let (client, _event_rx) = make_mocked_client();
1149        client.start_with_default_executor();
1150        client
1151            .data_store
1152            .write()
1153            .upsert(
1154                "prereq1",
1155                PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1156                    "prereq1", false,
1157                ))),
1158            )
1159            .expect("patch should apply");
1160        client
1161            .data_store
1162            .write()
1163            .upsert(
1164                "prereq2",
1165                PatchTarget::Flag(StorageItem::Item(basic_flag_with_visibility(
1166                    "prereq2", false,
1167                ))),
1168            )
1169            .expect("patch should apply");
1170
1171        client
1172            .data_store
1173            .write()
1174            .upsert(
1175                "toplevel",
1176                PatchTarget::Flag(StorageItem::Item(basic_flag_with_prereqs_and_visibility(
1177                    "toplevel",
1178                    &["prereq1", "prereq2"],
1179                    true,
1180                ))),
1181            )
1182            .expect("patch should apply");
1183
1184        let context = ContextBuilder::new("bob")
1185            .build()
1186            .expect("Failed to create context");
1187
1188        let mut config = FlagDetailConfig::new();
1189        config.client_side_only();
1190
1191        let all_flags = client.all_flags_detail(&context, config);
1192
1193        client.close();
1194
1195        assert_json_eq!(
1196            all_flags,
1197            json!({
1198                "toplevel": true,
1199                "$flagsState": {
1200                    "toplevel": {
1201                        "version": 42,
1202                        "variation": 1,
1203                        "prerequisites": ["prereq1", "prereq2"]
1204                    },
1205                },
1206                "$valid": true
1207            })
1208        );
1209    }
1210
1211    #[test]
1212    fn variation_tracks_events_correctly() {
1213        let (client, event_rx) = make_mocked_client();
1214        client.start_with_default_executor();
1215        client
1216            .data_store
1217            .write()
1218            .upsert(
1219                "myFlag",
1220                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1221            )
1222            .expect("patch should apply");
1223        let context = ContextBuilder::new("bob")
1224            .build()
1225            .expect("Failed to create context");
1226
1227        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1228
1229        assert!(flag_value.as_bool().unwrap());
1230        client.flush();
1231        client.close();
1232
1233        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1234        assert_eq!(events.len(), 2);
1235        assert_eq!(events[0].kind(), "index");
1236        assert_eq!(events[1].kind(), "summary");
1237
1238        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1239            let variation_key = VariationKey {
1240                version: Some(42),
1241                variation: Some(1),
1242            };
1243            let feature = event_summary.features.get("myFlag");
1244            assert!(feature.is_some());
1245
1246            let feature = feature.unwrap();
1247            assert!(feature.counters.contains_key(&variation_key));
1248        } else {
1249            panic!("Event should be a summary type");
1250        }
1251    }
1252
1253    #[test]
1254    fn variation_handles_offline_mode() {
1255        let (client, event_rx) = make_mocked_offline_client();
1256        client.start_with_default_executor();
1257
1258        let context = ContextBuilder::new("bob")
1259            .build()
1260            .expect("Failed to create context");
1261        let flag_value = client.variation(&context, "myFlag", FlagValue::Bool(false));
1262
1263        assert!(!flag_value.as_bool().unwrap());
1264        client.flush();
1265        client.close();
1266
1267        assert_eq!(event_rx.iter().count(), 0);
1268    }
1269
1270    #[test]
1271    fn variation_handles_unknown_flags() {
1272        let (client, event_rx) = make_mocked_client();
1273        client.start_with_default_executor();
1274        let context = ContextBuilder::new("bob")
1275            .build()
1276            .expect("Failed to create context");
1277
1278        let flag_value = client.variation(&context, "non-existent-flag", FlagValue::Bool(false));
1279
1280        assert!(!flag_value.as_bool().unwrap());
1281        client.flush();
1282        client.close();
1283
1284        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1285        assert_eq!(events.len(), 2);
1286        assert_eq!(events[0].kind(), "index");
1287        assert_eq!(events[1].kind(), "summary");
1288
1289        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1290            let variation_key = VariationKey {
1291                version: None,
1292                variation: None,
1293            };
1294
1295            let feature = event_summary.features.get("non-existent-flag");
1296            assert!(feature.is_some());
1297
1298            let feature = feature.unwrap();
1299            assert!(feature.counters.contains_key(&variation_key));
1300        } else {
1301            panic!("Event should be a summary type");
1302        }
1303    }
1304
1305    #[test]
1306    fn variation_detail_handles_debug_events_correctly() {
1307        let (client, event_rx) = make_mocked_client();
1308        client.start_with_default_executor();
1309
1310        let mut flag = basic_flag("myFlag");
1311        flag.debug_events_until_date = Some(64_060_606_800_000); // Jan. 1st, 4000
1312
1313        client
1314            .data_store
1315            .write()
1316            .upsert(
1317                &flag.key,
1318                PatchTarget::Flag(StorageItem::Item(flag.clone())),
1319            )
1320            .expect("patch should apply");
1321        let context = ContextBuilder::new("bob")
1322            .build()
1323            .expect("Failed to create context");
1324
1325        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1326
1327        assert!(detail.value.unwrap().as_bool().unwrap());
1328        assert!(matches!(
1329            detail.reason,
1330            Reason::Fallthrough {
1331                in_experiment: false
1332            }
1333        ));
1334        client.flush();
1335        client.close();
1336
1337        let events = event_rx.try_iter().collect::<Vec<OutputEvent>>();
1338        assert_eq!(events.len(), 3);
1339        assert_eq!(events[0].kind(), "index");
1340        assert_eq!(events[1].kind(), "debug");
1341        assert_eq!(events[2].kind(), "summary");
1342
1343        if let OutputEvent::Summary(event_summary) = events[2].clone() {
1344            let variation_key = VariationKey {
1345                version: Some(42),
1346                variation: Some(1),
1347            };
1348
1349            let feature = event_summary.features.get("myFlag");
1350            assert!(feature.is_some());
1351
1352            let feature = feature.unwrap();
1353            assert!(feature.counters.contains_key(&variation_key));
1354        } else {
1355            panic!("Event should be a summary type");
1356        }
1357    }
1358
1359    #[test]
1360    fn variation_detail_tracks_events_correctly() {
1361        let (client, event_rx) = make_mocked_client();
1362        client.start_with_default_executor();
1363
1364        client
1365            .data_store
1366            .write()
1367            .upsert(
1368                "myFlag",
1369                PatchTarget::Flag(StorageItem::Item(basic_flag("myFlag"))),
1370            )
1371            .expect("patch should apply");
1372        let context = ContextBuilder::new("bob")
1373            .build()
1374            .expect("Failed to create context");
1375
1376        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1377
1378        assert!(detail.value.unwrap().as_bool().unwrap());
1379        assert!(matches!(
1380            detail.reason,
1381            Reason::Fallthrough {
1382                in_experiment: false
1383            }
1384        ));
1385        client.flush();
1386        client.close();
1387
1388        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1389        assert_eq!(events.len(), 2);
1390        assert_eq!(events[0].kind(), "index");
1391        assert_eq!(events[1].kind(), "summary");
1392
1393        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1394            let variation_key = VariationKey {
1395                version: Some(42),
1396                variation: Some(1),
1397            };
1398
1399            let feature = event_summary.features.get("myFlag");
1400            assert!(feature.is_some());
1401
1402            let feature = feature.unwrap();
1403            assert!(feature.counters.contains_key(&variation_key));
1404        } else {
1405            panic!("Event should be a summary type");
1406        }
1407    }
1408
1409    #[test]
1410    fn variation_detail_handles_offline_mode() {
1411        let (client, event_rx) = make_mocked_offline_client();
1412        client.start_with_default_executor();
1413
1414        let context = ContextBuilder::new("bob")
1415            .build()
1416            .expect("Failed to create context");
1417
1418        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1419
1420        assert!(!detail.value.unwrap().as_bool().unwrap());
1421        assert!(matches!(
1422            detail.reason,
1423            Reason::Error {
1424                error: eval::Error::ClientNotReady
1425            }
1426        ));
1427        client.flush();
1428        client.close();
1429
1430        assert_eq!(event_rx.iter().count(), 0);
1431    }
1432
1433    struct InMemoryPersistentDataStoreFactory {
1434        data: AllData<Flag, Segment>,
1435        initialized: bool,
1436    }
1437
1438    impl PersistentDataStoreFactory for InMemoryPersistentDataStoreFactory {
1439        fn create_persistent_data_store(
1440            &self,
1441        ) -> Result<Box<(dyn PersistentDataStore + 'static)>, std::io::Error> {
1442            let serialized_data =
1443                AllData::<SerializedItem, SerializedItem>::try_from(self.data.clone())?;
1444            Ok(Box::new(InMemoryPersistentDataStore {
1445                data: serialized_data,
1446                initialized: self.initialized,
1447            }))
1448        }
1449    }
1450
1451    #[test]
1452    fn variation_detail_handles_daemon_mode() {
1453        testing_logger::setup();
1454        let factory = InMemoryPersistentDataStoreFactory {
1455            data: AllData {
1456                flags: hashmap!["flag".into() => basic_flag("flag")],
1457                segments: HashMap::new(),
1458            },
1459            initialized: true,
1460        };
1461        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1462
1463        let config = ConfigBuilder::new("sdk-key")
1464            .daemon_mode(true)
1465            .data_store(&builder)
1466            .event_processor(&NullEventProcessorBuilder::new())
1467            .build()
1468            .expect("config should build");
1469
1470        let client = Client::build(config).expect("Should be built.");
1471
1472        client.start_with_default_executor();
1473
1474        let context = ContextBuilder::new("bob")
1475            .build()
1476            .expect("Failed to create context");
1477
1478        let detail = client.variation_detail(&context, "flag", FlagValue::Bool(false));
1479
1480        assert!(detail.value.unwrap().as_bool().unwrap());
1481        assert!(matches!(
1482            detail.reason,
1483            Reason::Fallthrough {
1484                in_experiment: false
1485            }
1486        ));
1487        client.flush();
1488        client.close();
1489
1490        testing_logger::validate(|captured_logs| {
1491            assert_eq!(captured_logs.len(), 1);
1492            assert_eq!(
1493                captured_logs[0].body,
1494                "Started LaunchDarkly Client in daemon mode"
1495            );
1496        });
1497    }
1498
1499    #[test]
1500    fn daemon_mode_is_quiet_if_store_is_not_initialized() {
1501        testing_logger::setup();
1502
1503        let factory = InMemoryPersistentDataStoreFactory {
1504            data: AllData {
1505                flags: HashMap::new(),
1506                segments: HashMap::new(),
1507            },
1508            initialized: false,
1509        };
1510        let builder = PersistentDataStoreBuilder::new(Arc::new(factory));
1511
1512        let config = ConfigBuilder::new("sdk-key")
1513            .daemon_mode(true)
1514            .data_store(&builder)
1515            .event_processor(&NullEventProcessorBuilder::new())
1516            .build()
1517            .expect("config should build");
1518
1519        let client = Client::build(config).expect("Should be built.");
1520
1521        client.start_with_default_executor();
1522
1523        let context = ContextBuilder::new("bob")
1524            .build()
1525            .expect("Failed to create context");
1526
1527        client.variation_detail(&context, "flag", FlagValue::Bool(false));
1528
1529        testing_logger::validate(|captured_logs| {
1530            assert_eq!(captured_logs.len(), 1);
1531            assert_eq!(
1532                captured_logs[0].body,
1533                "Started LaunchDarkly Client in daemon mode"
1534            );
1535        });
1536    }
1537
1538    #[test]
1539    fn variation_handles_off_flag_without_variation() {
1540        let (client, event_rx) = make_mocked_client();
1541        client.start_with_default_executor();
1542
1543        client
1544            .data_store
1545            .write()
1546            .upsert(
1547                "myFlag",
1548                PatchTarget::Flag(StorageItem::Item(basic_off_flag("myFlag"))),
1549            )
1550            .expect("patch should apply");
1551        let context = ContextBuilder::new("bob")
1552            .build()
1553            .expect("Failed to create context");
1554
1555        let result = client.variation(&context, "myFlag", FlagValue::Bool(false));
1556
1557        assert!(!result.as_bool().unwrap());
1558        client.flush();
1559        client.close();
1560
1561        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1562        assert_eq!(events.len(), 2);
1563        assert_eq!(events[0].kind(), "index");
1564        assert_eq!(events[1].kind(), "summary");
1565
1566        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1567            let variation_key = VariationKey {
1568                version: Some(42),
1569                variation: None,
1570            };
1571            let feature = event_summary.features.get("myFlag");
1572            assert!(feature.is_some());
1573
1574            let feature = feature.unwrap();
1575            assert!(feature.counters.contains_key(&variation_key));
1576        } else {
1577            panic!("Event should be a summary type");
1578        }
1579    }
1580
1581    #[test]
1582    fn variation_detail_tracks_prereq_events_correctly() {
1583        let (client, event_rx) = make_mocked_client();
1584        client.start_with_default_executor();
1585
1586        let mut basic_preqreq_flag = basic_flag("prereqFlag");
1587        basic_preqreq_flag.track_events = true;
1588
1589        client
1590            .data_store
1591            .write()
1592            .upsert(
1593                "prereqFlag",
1594                PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1595            )
1596            .expect("patch should apply");
1597
1598        let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1599        basic_flag.track_events = true;
1600        client
1601            .data_store
1602            .write()
1603            .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1604            .expect("patch should apply");
1605        let context = ContextBuilder::new("bob")
1606            .build()
1607            .expect("Failed to create context");
1608
1609        let detail = client.variation_detail(&context, "myFlag", FlagValue::Bool(false));
1610
1611        assert!(detail.value.unwrap().as_bool().unwrap());
1612        assert!(matches!(
1613            detail.reason,
1614            Reason::Fallthrough {
1615                in_experiment: false
1616            }
1617        ));
1618        client.flush();
1619        client.close();
1620
1621        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1622        assert_eq!(events.len(), 4);
1623        assert_eq!(events[0].kind(), "index");
1624        assert_eq!(events[1].kind(), "feature");
1625        assert_eq!(events[2].kind(), "feature");
1626        assert_eq!(events[3].kind(), "summary");
1627
1628        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1629            let variation_key = VariationKey {
1630                version: Some(42),
1631                variation: Some(1),
1632            };
1633            let feature = event_summary.features.get("myFlag");
1634            assert!(feature.is_some());
1635
1636            let feature = feature.unwrap();
1637            assert!(feature.counters.contains_key(&variation_key));
1638
1639            let variation_key = VariationKey {
1640                version: Some(42),
1641                variation: Some(1),
1642            };
1643            let feature = event_summary.features.get("prereqFlag");
1644            assert!(feature.is_some());
1645
1646            let feature = feature.unwrap();
1647            assert!(feature.counters.contains_key(&variation_key));
1648        }
1649    }
1650
1651    #[test]
1652    fn variation_handles_failed_prereqs_correctly() {
1653        let (client, event_rx) = make_mocked_client();
1654        client.start_with_default_executor();
1655
1656        let mut basic_preqreq_flag = basic_off_flag("prereqFlag");
1657        basic_preqreq_flag.track_events = true;
1658
1659        client
1660            .data_store
1661            .write()
1662            .upsert(
1663                "prereqFlag",
1664                PatchTarget::Flag(StorageItem::Item(basic_preqreq_flag)),
1665            )
1666            .expect("patch should apply");
1667
1668        let mut basic_flag = basic_flag_with_prereq("myFlag", "prereqFlag");
1669        basic_flag.track_events = true;
1670        client
1671            .data_store
1672            .write()
1673            .upsert("myFlag", PatchTarget::Flag(StorageItem::Item(basic_flag)))
1674            .expect("patch should apply");
1675        let context = ContextBuilder::new("bob")
1676            .build()
1677            .expect("Failed to create context");
1678
1679        let detail = client.variation(&context, "myFlag", FlagValue::Bool(false));
1680
1681        assert!(!detail.as_bool().unwrap());
1682        client.flush();
1683        client.close();
1684
1685        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1686        assert_eq!(events.len(), 4);
1687        assert_eq!(events[0].kind(), "index");
1688        assert_eq!(events[1].kind(), "feature");
1689        assert_eq!(events[2].kind(), "feature");
1690        assert_eq!(events[3].kind(), "summary");
1691
1692        if let OutputEvent::Summary(event_summary) = events[3].clone() {
1693            let variation_key = VariationKey {
1694                version: Some(42),
1695                variation: Some(0),
1696            };
1697            let feature = event_summary.features.get("myFlag");
1698            assert!(feature.is_some());
1699
1700            let feature = feature.unwrap();
1701            assert!(feature.counters.contains_key(&variation_key));
1702
1703            let variation_key = VariationKey {
1704                version: Some(42),
1705                variation: None,
1706            };
1707            let feature = event_summary.features.get("prereqFlag");
1708            assert!(feature.is_some());
1709
1710            let feature = feature.unwrap();
1711            assert!(feature.counters.contains_key(&variation_key));
1712        }
1713    }
1714
1715    #[test]
1716    fn variation_detail_handles_flag_not_found() {
1717        let (client, event_rx) = make_mocked_client();
1718        client.start_with_default_executor();
1719
1720        let context = ContextBuilder::new("bob")
1721            .build()
1722            .expect("Failed to create context");
1723        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1724
1725        assert!(!detail.value.unwrap().as_bool().unwrap());
1726        assert!(matches!(
1727            detail.reason,
1728            Reason::Error {
1729                error: eval::Error::FlagNotFound
1730            }
1731        ));
1732        client.flush();
1733        client.close();
1734
1735        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1736        assert_eq!(events.len(), 2);
1737        assert_eq!(events[0].kind(), "index");
1738        assert_eq!(events[1].kind(), "summary");
1739
1740        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1741            let variation_key = VariationKey {
1742                version: None,
1743                variation: None,
1744            };
1745            let feature = event_summary.features.get("non-existent-flag");
1746            assert!(feature.is_some());
1747
1748            let feature = feature.unwrap();
1749            assert!(feature.counters.contains_key(&variation_key));
1750        } else {
1751            panic!("Event should be a summary type");
1752        }
1753    }
1754
1755    #[tokio::test]
1756    async fn variation_detail_handles_client_not_ready() {
1757        let (client, event_rx) = make_mocked_client_with_delay(u64::MAX, false, false);
1758        client.start_with_default_executor();
1759        let context = ContextBuilder::new("bob")
1760            .build()
1761            .expect("Failed to create context");
1762
1763        let detail = client.variation_detail(&context, "non-existent-flag", FlagValue::Bool(false));
1764
1765        assert!(!detail.value.unwrap().as_bool().unwrap());
1766        assert!(matches!(
1767            detail.reason,
1768            Reason::Error {
1769                error: eval::Error::ClientNotReady
1770            }
1771        ));
1772        client.flush();
1773        client.close();
1774
1775        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1776        assert_eq!(events.len(), 2);
1777        assert_eq!(events[0].kind(), "index");
1778        assert_eq!(events[1].kind(), "summary");
1779
1780        if let OutputEvent::Summary(event_summary) = events[1].clone() {
1781            let variation_key = VariationKey {
1782                version: None,
1783                variation: None,
1784            };
1785            let feature = event_summary.features.get("non-existent-flag");
1786            assert!(feature.is_some());
1787
1788            let feature = feature.unwrap();
1789            assert!(feature.counters.contains_key(&variation_key));
1790        } else {
1791            panic!("Event should be a summary type");
1792        }
1793    }
1794
1795    #[test]
1796    fn identify_sends_identify_event() {
1797        let (client, event_rx) = make_mocked_client();
1798        client.start_with_default_executor();
1799
1800        let context = ContextBuilder::new("bob")
1801            .build()
1802            .expect("Failed to create context");
1803
1804        client.identify(context);
1805        client.flush();
1806        client.close();
1807
1808        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1809        assert_eq!(events.len(), 1);
1810        assert_eq!(events[0].kind(), "identify");
1811    }
1812
1813    #[test]
1814    fn identify_sends_sends_nothing_in_offline_mode() {
1815        let (client, event_rx) = make_mocked_offline_client();
1816        client.start_with_default_executor();
1817
1818        let context = ContextBuilder::new("bob")
1819            .build()
1820            .expect("Failed to create context");
1821
1822        client.identify(context);
1823        client.flush();
1824        client.close();
1825
1826        assert_eq!(event_rx.iter().count(), 0);
1827    }
1828
1829    #[test]
1830    fn secure_mode_hash() {
1831        let config = ConfigBuilder::new("secret")
1832            .offline(true)
1833            .build()
1834            .expect("config should build");
1835        let client = Client::build(config).expect("Should be built.");
1836        let context = ContextBuilder::new("Message")
1837            .build()
1838            .expect("Failed to create context");
1839
1840        assert_eq!(
1841            client.secure_mode_hash(&context),
1842            "aa747c502a898200f9e4fa21bac68136f886a0e27aec70ba06daf2e2a5cb5597"
1843        );
1844    }
1845
1846    #[test]
1847    fn secure_mode_hash_with_multi_kind() {
1848        let config = ConfigBuilder::new("secret")
1849            .offline(true)
1850            .build()
1851            .expect("config should build");
1852        let client = Client::build(config).expect("Should be built.");
1853
1854        let org = ContextBuilder::new("org-key|1")
1855            .kind("org")
1856            .build()
1857            .expect("Failed to create context");
1858        let user = ContextBuilder::new("user-key:2")
1859            .build()
1860            .expect("Failed to create context");
1861
1862        let context = MultiContextBuilder::new()
1863            .add_context(org)
1864            .add_context(user)
1865            .build()
1866            .expect("failed to build multi-context");
1867
1868        assert_eq!(
1869            client.secure_mode_hash(&context),
1870            "5687e6383b920582ed50c2a96c98a115f1b6aad85a60579d761d9b8797415163"
1871        );
1872    }
1873
1874    #[derive(Serialize)]
1875    struct MyCustomData {
1876        pub answer: u32,
1877    }
1878
1879    #[test]
1880    fn track_sends_track_and_index_events() -> serde_json::Result<()> {
1881        let (client, event_rx) = make_mocked_client();
1882        client.start_with_default_executor();
1883
1884        let context = ContextBuilder::new("bob")
1885            .build()
1886            .expect("Failed to create context");
1887
1888        client.track_event(context.clone(), "event-with-null");
1889        client.track_data(context.clone(), "event-with-string", "string-data")?;
1890        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1891        client.track_data(
1892            context.clone(),
1893            "event-with-struct",
1894            MyCustomData { answer: 42 },
1895        )?;
1896        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1897
1898        client.flush();
1899        client.close();
1900
1901        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
1902        assert_eq!(events.len(), 6);
1903
1904        let mut events_by_type: HashMap<&str, usize> = HashMap::new();
1905        for event in events {
1906            if let Some(count) = events_by_type.get_mut(event.kind()) {
1907                *count += 1;
1908            } else {
1909                events_by_type.insert(event.kind(), 1);
1910            }
1911        }
1912        assert!(matches!(events_by_type.get("index"), Some(1)));
1913        assert!(matches!(events_by_type.get("custom"), Some(5)));
1914
1915        Ok(())
1916    }
1917
1918    #[test]
1919    fn track_sends_nothing_in_offline_mode() -> serde_json::Result<()> {
1920        let (client, event_rx) = make_mocked_offline_client();
1921        client.start_with_default_executor();
1922
1923        let context = ContextBuilder::new("bob")
1924            .build()
1925            .expect("Failed to create context");
1926
1927        client.track_event(context.clone(), "event-with-null");
1928        client.track_data(context.clone(), "event-with-string", "string-data")?;
1929        client.track_data(context.clone(), "event-with-json", json!({"answer": 42}))?;
1930        client.track_data(
1931            context.clone(),
1932            "event-with-struct",
1933            MyCustomData { answer: 42 },
1934        )?;
1935        client.track_metric(context, "event-with-metric", 42.0, serde_json::Value::Null);
1936
1937        client.flush();
1938        client.close();
1939
1940        assert_eq!(event_rx.iter().count(), 0);
1941
1942        Ok(())
1943    }
1944
1945    #[test]
1946    fn migration_handles_flag_not_found() {
1947        let (client, _event_rx) = make_mocked_client();
1948        client.start_with_default_executor();
1949
1950        let context = ContextBuilder::new("bob")
1951            .build()
1952            .expect("Failed to create context");
1953
1954        let (stage, _tracker) =
1955            client.migration_variation(&context, "non-existent-flag-key", Stage::Off);
1956
1957        assert_eq!(stage, Stage::Off);
1958    }
1959
1960    #[test]
1961    fn migration_uses_non_migration_flag() {
1962        let (client, _event_rx) = make_mocked_client();
1963        client.start_with_default_executor();
1964        client
1965            .data_store
1966            .write()
1967            .upsert(
1968                "boolean-flag",
1969                PatchTarget::Flag(StorageItem::Item(basic_flag("boolean-flag"))),
1970            )
1971            .expect("patch should apply");
1972
1973        let context = ContextBuilder::new("bob")
1974            .build()
1975            .expect("Failed to create context");
1976
1977        let (stage, _tracker) = client.migration_variation(&context, "boolean-flag", Stage::Off);
1978
1979        assert_eq!(stage, Stage::Off);
1980    }
1981
1982    #[test_case(Stage::Off)]
1983    #[test_case(Stage::DualWrite)]
1984    #[test_case(Stage::Shadow)]
1985    #[test_case(Stage::Live)]
1986    #[test_case(Stage::Rampdown)]
1987    #[test_case(Stage::Complete)]
1988    fn migration_can_determine_correct_stage_from_flag(stage: Stage) {
1989        let (client, _event_rx) = make_mocked_client();
1990        client.start_with_default_executor();
1991        client
1992            .data_store
1993            .write()
1994            .upsert(
1995                "stage-flag",
1996                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
1997            )
1998            .expect("patch should apply");
1999
2000        let context = ContextBuilder::new("bob")
2001            .build()
2002            .expect("Failed to create context");
2003
2004        let (evaluated_stage, _tracker) =
2005            client.migration_variation(&context, "stage-flag", Stage::Off);
2006
2007        assert_eq!(evaluated_stage, stage);
2008    }
2009
2010    #[tokio::test]
2011    async fn migration_tracks_invoked_correctly() {
2012        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Read, vec![Origin::Old])
2013            .await;
2014        migration_tracks_invoked_correctly_driver(
2015            Stage::DualWrite,
2016            Operation::Read,
2017            vec![Origin::Old],
2018        )
2019        .await;
2020        migration_tracks_invoked_correctly_driver(
2021            Stage::Shadow,
2022            Operation::Read,
2023            vec![Origin::Old, Origin::New],
2024        )
2025        .await;
2026        migration_tracks_invoked_correctly_driver(
2027            Stage::Live,
2028            Operation::Read,
2029            vec![Origin::Old, Origin::New],
2030        )
2031        .await;
2032        migration_tracks_invoked_correctly_driver(
2033            Stage::Rampdown,
2034            Operation::Read,
2035            vec![Origin::New],
2036        )
2037        .await;
2038        migration_tracks_invoked_correctly_driver(
2039            Stage::Complete,
2040            Operation::Read,
2041            vec![Origin::New],
2042        )
2043        .await;
2044        migration_tracks_invoked_correctly_driver(Stage::Off, Operation::Write, vec![Origin::Old])
2045            .await;
2046        migration_tracks_invoked_correctly_driver(
2047            Stage::DualWrite,
2048            Operation::Write,
2049            vec![Origin::Old, Origin::New],
2050        )
2051        .await;
2052        migration_tracks_invoked_correctly_driver(
2053            Stage::Shadow,
2054            Operation::Write,
2055            vec![Origin::Old, Origin::New],
2056        )
2057        .await;
2058        migration_tracks_invoked_correctly_driver(
2059            Stage::Live,
2060            Operation::Write,
2061            vec![Origin::Old, Origin::New],
2062        )
2063        .await;
2064        migration_tracks_invoked_correctly_driver(
2065            Stage::Rampdown,
2066            Operation::Write,
2067            vec![Origin::Old, Origin::New],
2068        )
2069        .await;
2070        migration_tracks_invoked_correctly_driver(
2071            Stage::Complete,
2072            Operation::Write,
2073            vec![Origin::New],
2074        )
2075        .await;
2076    }
2077
2078    async fn migration_tracks_invoked_correctly_driver(
2079        stage: Stage,
2080        operation: Operation,
2081        origins: Vec<Origin>,
2082    ) {
2083        let (client, event_rx) = make_mocked_client();
2084        let client = Arc::new(client);
2085        client.start_with_default_executor();
2086        client
2087            .data_store
2088            .write()
2089            .upsert(
2090                "stage-flag",
2091                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2092            )
2093            .expect("patch should apply");
2094
2095        let mut migrator = MigratorBuilder::new(client.clone())
2096            .read(
2097                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2098                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2099                Some(|_, _| true),
2100            )
2101            .write(
2102                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2103                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2104            )
2105            .build()
2106            .expect("migrator should build");
2107
2108        let context = ContextBuilder::new("bob")
2109            .build()
2110            .expect("Failed to create context");
2111
2112        if let Operation::Read = operation {
2113            migrator
2114                .read(
2115                    &context,
2116                    "stage-flag".into(),
2117                    Stage::Off,
2118                    serde_json::Value::Null,
2119                )
2120                .await;
2121        } else {
2122            migrator
2123                .write(
2124                    &context,
2125                    "stage-flag".into(),
2126                    Stage::Off,
2127                    serde_json::Value::Null,
2128                )
2129                .await;
2130        }
2131
2132        client.flush();
2133        client.close();
2134
2135        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2136        assert_eq!(events.len(), 3);
2137        match &events[1] {
2138            OutputEvent::MigrationOp(event) => {
2139                assert!(event.invoked.len() == origins.len());
2140                assert!(event.invoked.iter().all(|i| origins.contains(i)));
2141            }
2142            _ => panic!("Expected migration event"),
2143        }
2144    }
2145
2146    #[tokio::test]
2147    async fn migration_tracks_latency() {
2148        migration_tracks_latency_driver(Stage::Off, Operation::Read, vec![Origin::Old]).await;
2149        migration_tracks_latency_driver(Stage::DualWrite, Operation::Read, vec![Origin::Old]).await;
2150        migration_tracks_latency_driver(
2151            Stage::Shadow,
2152            Operation::Read,
2153            vec![Origin::Old, Origin::New],
2154        )
2155        .await;
2156        migration_tracks_latency_driver(
2157            Stage::Live,
2158            Operation::Read,
2159            vec![Origin::Old, Origin::New],
2160        )
2161        .await;
2162        migration_tracks_latency_driver(Stage::Rampdown, Operation::Read, vec![Origin::New]).await;
2163        migration_tracks_latency_driver(Stage::Complete, Operation::Read, vec![Origin::New]).await;
2164        migration_tracks_latency_driver(Stage::Off, Operation::Write, vec![Origin::Old]).await;
2165        migration_tracks_latency_driver(
2166            Stage::DualWrite,
2167            Operation::Write,
2168            vec![Origin::Old, Origin::New],
2169        )
2170        .await;
2171        migration_tracks_latency_driver(
2172            Stage::Shadow,
2173            Operation::Write,
2174            vec![Origin::Old, Origin::New],
2175        )
2176        .await;
2177        migration_tracks_latency_driver(
2178            Stage::Live,
2179            Operation::Write,
2180            vec![Origin::Old, Origin::New],
2181        )
2182        .await;
2183        migration_tracks_latency_driver(
2184            Stage::Rampdown,
2185            Operation::Write,
2186            vec![Origin::Old, Origin::New],
2187        )
2188        .await;
2189        migration_tracks_latency_driver(Stage::Complete, Operation::Write, vec![Origin::New]).await;
2190    }
2191
2192    async fn migration_tracks_latency_driver(
2193        stage: Stage,
2194        operation: Operation,
2195        origins: Vec<Origin>,
2196    ) {
2197        let (client, event_rx) = make_mocked_client();
2198        let client = Arc::new(client);
2199        client.start_with_default_executor();
2200        client
2201            .data_store
2202            .write()
2203            .upsert(
2204                "stage-flag",
2205                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2206            )
2207            .expect("patch should apply");
2208
2209        let mut migrator = MigratorBuilder::new(client.clone())
2210            .track_latency(true)
2211            .read(
2212                |_| {
2213                    async move {
2214                        async_std::task::sleep(Duration::from_millis(100)).await;
2215                        Ok(serde_json::Value::Null)
2216                    }
2217                    .boxed()
2218                },
2219                |_| {
2220                    async move {
2221                        async_std::task::sleep(Duration::from_millis(100)).await;
2222                        Ok(serde_json::Value::Null)
2223                    }
2224                    .boxed()
2225                },
2226                Some(|_, _| true),
2227            )
2228            .write(
2229                |_| {
2230                    async move {
2231                        async_std::task::sleep(Duration::from_millis(100)).await;
2232                        Ok(serde_json::Value::Null)
2233                    }
2234                    .boxed()
2235                },
2236                |_| {
2237                    async move {
2238                        async_std::task::sleep(Duration::from_millis(100)).await;
2239                        Ok(serde_json::Value::Null)
2240                    }
2241                    .boxed()
2242                },
2243            )
2244            .build()
2245            .expect("migrator should build");
2246
2247        let context = ContextBuilder::new("bob")
2248            .build()
2249            .expect("Failed to create context");
2250
2251        if let Operation::Read = operation {
2252            migrator
2253                .read(
2254                    &context,
2255                    "stage-flag".into(),
2256                    Stage::Off,
2257                    serde_json::Value::Null,
2258                )
2259                .await;
2260        } else {
2261            migrator
2262                .write(
2263                    &context,
2264                    "stage-flag".into(),
2265                    Stage::Off,
2266                    serde_json::Value::Null,
2267                )
2268                .await;
2269        }
2270
2271        client.flush();
2272        client.close();
2273
2274        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2275        assert_eq!(events.len(), 3);
2276        match &events[1] {
2277            OutputEvent::MigrationOp(event) => {
2278                assert!(event.latency.len() == origins.len());
2279                assert!(event
2280                    .latency
2281                    .values()
2282                    .all(|l| l > &Duration::from_millis(100)));
2283            }
2284            _ => panic!("Expected migration event"),
2285        }
2286    }
2287
2288    #[tokio::test]
2289    async fn migration_tracks_read_errors() {
2290        migration_tracks_read_errors_driver(Stage::Off, vec![Origin::Old]).await;
2291        migration_tracks_read_errors_driver(Stage::DualWrite, vec![Origin::Old]).await;
2292        migration_tracks_read_errors_driver(Stage::Shadow, vec![Origin::Old, Origin::New]).await;
2293        migration_tracks_read_errors_driver(Stage::Live, vec![Origin::Old, Origin::New]).await;
2294        migration_tracks_read_errors_driver(Stage::Rampdown, vec![Origin::New]).await;
2295        migration_tracks_read_errors_driver(Stage::Complete, vec![Origin::New]).await;
2296    }
2297
2298    async fn migration_tracks_read_errors_driver(stage: Stage, origins: Vec<Origin>) {
2299        let (client, event_rx) = make_mocked_client();
2300        let client = Arc::new(client);
2301        client.start_with_default_executor();
2302        client
2303            .data_store
2304            .write()
2305            .upsert(
2306                "stage-flag",
2307                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2308            )
2309            .expect("patch should apply");
2310
2311        let mut migrator = MigratorBuilder::new(client.clone())
2312            .track_latency(true)
2313            .read(
2314                |_| async move { Err("fail".into()) }.boxed(),
2315                |_| async move { Err("fail".into()) }.boxed(),
2316                Some(|_: &String, _: &String| true),
2317            )
2318            .write(
2319                |_| async move { Err("fail".into()) }.boxed(),
2320                |_| async move { Err("fail".into()) }.boxed(),
2321            )
2322            .build()
2323            .expect("migrator should build");
2324
2325        let context = ContextBuilder::new("bob")
2326            .build()
2327            .expect("Failed to create context");
2328
2329        migrator
2330            .read(
2331                &context,
2332                "stage-flag".into(),
2333                Stage::Off,
2334                serde_json::Value::Null,
2335            )
2336            .await;
2337        client.flush();
2338        client.close();
2339
2340        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2341        assert_eq!(events.len(), 3);
2342        match &events[1] {
2343            OutputEvent::MigrationOp(event) => {
2344                assert!(event.errors.len() == origins.len());
2345                assert!(event.errors.iter().all(|i| origins.contains(i)));
2346            }
2347            _ => panic!("Expected migration event"),
2348        }
2349    }
2350
2351    #[tokio::test]
2352    async fn migration_tracks_authoritative_write_errors() {
2353        migration_tracks_authoritative_write_errors_driver(Stage::Off, vec![Origin::Old]).await;
2354        migration_tracks_authoritative_write_errors_driver(Stage::DualWrite, vec![Origin::Old])
2355            .await;
2356        migration_tracks_authoritative_write_errors_driver(Stage::Shadow, vec![Origin::Old]).await;
2357        migration_tracks_authoritative_write_errors_driver(Stage::Live, vec![Origin::New]).await;
2358        migration_tracks_authoritative_write_errors_driver(Stage::Rampdown, vec![Origin::New])
2359            .await;
2360        migration_tracks_authoritative_write_errors_driver(Stage::Complete, vec![Origin::New])
2361            .await;
2362    }
2363
2364    async fn migration_tracks_authoritative_write_errors_driver(
2365        stage: Stage,
2366        origins: Vec<Origin>,
2367    ) {
2368        let (client, event_rx) = make_mocked_client();
2369        let client = Arc::new(client);
2370        client.start_with_default_executor();
2371        client
2372            .data_store
2373            .write()
2374            .upsert(
2375                "stage-flag",
2376                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2377            )
2378            .expect("patch should apply");
2379
2380        let mut migrator = MigratorBuilder::new(client.clone())
2381            .track_latency(true)
2382            .read(
2383                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2384                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2385                None,
2386            )
2387            .write(
2388                |_| async move { Err("fail".into()) }.boxed(),
2389                |_| async move { Err("fail".into()) }.boxed(),
2390            )
2391            .build()
2392            .expect("migrator should build");
2393
2394        let context = ContextBuilder::new("bob")
2395            .build()
2396            .expect("Failed to create context");
2397
2398        migrator
2399            .write(
2400                &context,
2401                "stage-flag".into(),
2402                Stage::Off,
2403                serde_json::Value::Null,
2404            )
2405            .await;
2406
2407        client.flush();
2408        client.close();
2409
2410        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2411        assert_eq!(events.len(), 3);
2412        match &events[1] {
2413            OutputEvent::MigrationOp(event) => {
2414                assert!(event.errors.len() == origins.len());
2415                assert!(event.errors.iter().all(|i| origins.contains(i)));
2416            }
2417            _ => panic!("Expected migration event"),
2418        }
2419    }
2420
2421    #[tokio::test]
2422    async fn migration_tracks_nonauthoritative_write_errors() {
2423        migration_tracks_nonauthoritative_write_errors_driver(
2424            Stage::DualWrite,
2425            false,
2426            true,
2427            vec![Origin::New],
2428        )
2429        .await;
2430        migration_tracks_nonauthoritative_write_errors_driver(
2431            Stage::Shadow,
2432            false,
2433            true,
2434            vec![Origin::New],
2435        )
2436        .await;
2437        migration_tracks_nonauthoritative_write_errors_driver(
2438            Stage::Live,
2439            true,
2440            false,
2441            vec![Origin::Old],
2442        )
2443        .await;
2444        migration_tracks_nonauthoritative_write_errors_driver(
2445            Stage::Rampdown,
2446            true,
2447            false,
2448            vec![Origin::Old],
2449        )
2450        .await;
2451    }
2452
2453    async fn migration_tracks_nonauthoritative_write_errors_driver(
2454        stage: Stage,
2455        fail_old: bool,
2456        fail_new: bool,
2457        origins: Vec<Origin>,
2458    ) {
2459        let (client, event_rx) = make_mocked_client();
2460        let client = Arc::new(client);
2461        client.start_with_default_executor();
2462        client
2463            .data_store
2464            .write()
2465            .upsert(
2466                "stage-flag",
2467                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2468            )
2469            .expect("patch should apply");
2470
2471        let mut migrator = MigratorBuilder::new(client.clone())
2472            .track_latency(true)
2473            .read(
2474                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2475                |_| async move { Ok(serde_json::Value::Null) }.boxed(),
2476                None,
2477            )
2478            .write(
2479                move |_| {
2480                    async move {
2481                        if fail_old {
2482                            Err("fail".into())
2483                        } else {
2484                            Ok(serde_json::Value::Null)
2485                        }
2486                    }
2487                    .boxed()
2488                },
2489                move |_| {
2490                    async move {
2491                        if fail_new {
2492                            Err("fail".into())
2493                        } else {
2494                            Ok(serde_json::Value::Null)
2495                        }
2496                    }
2497                    .boxed()
2498                },
2499            )
2500            .build()
2501            .expect("migrator should build");
2502
2503        let context = ContextBuilder::new("bob")
2504            .build()
2505            .expect("Failed to create context");
2506
2507        migrator
2508            .write(
2509                &context,
2510                "stage-flag".into(),
2511                Stage::Off,
2512                serde_json::Value::Null,
2513            )
2514            .await;
2515
2516        client.flush();
2517        client.close();
2518
2519        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2520        assert_eq!(events.len(), 3);
2521        match &events[1] {
2522            OutputEvent::MigrationOp(event) => {
2523                assert!(event.errors.len() == origins.len());
2524                assert!(event.errors.iter().all(|i| origins.contains(i)));
2525            }
2526            _ => panic!("Expected migration event"),
2527        }
2528    }
2529
2530    #[tokio::test]
2531    async fn migration_tracks_consistency() {
2532        migration_tracks_consistency_driver(Stage::Shadow, "same", "same", true).await;
2533        migration_tracks_consistency_driver(Stage::Shadow, "same", "different", false).await;
2534        migration_tracks_consistency_driver(Stage::Live, "same", "same", true).await;
2535        migration_tracks_consistency_driver(Stage::Live, "same", "different", false).await;
2536    }
2537
2538    async fn migration_tracks_consistency_driver(
2539        stage: Stage,
2540        old_return: &'static str,
2541        new_return: &'static str,
2542        expected_consistency: bool,
2543    ) {
2544        let (client, event_rx) = make_mocked_client();
2545        let client = Arc::new(client);
2546        client.start_with_default_executor();
2547        client
2548            .data_store
2549            .write()
2550            .upsert(
2551                "stage-flag",
2552                PatchTarget::Flag(StorageItem::Item(basic_migration_flag("stage-flag", stage))),
2553            )
2554            .expect("patch should apply");
2555
2556        let mut migrator = MigratorBuilder::new(client.clone())
2557            .track_latency(true)
2558            .read(
2559                |_| {
2560                    async move {
2561                        async_std::task::sleep(Duration::from_millis(100)).await;
2562                        Ok(serde_json::Value::String(old_return.to_string()))
2563                    }
2564                    .boxed()
2565                },
2566                |_| {
2567                    async move {
2568                        async_std::task::sleep(Duration::from_millis(100)).await;
2569                        Ok(serde_json::Value::String(new_return.to_string()))
2570                    }
2571                    .boxed()
2572                },
2573                Some(|lhs, rhs| lhs == rhs),
2574            )
2575            .write(
2576                |_| {
2577                    async move {
2578                        async_std::task::sleep(Duration::from_millis(100)).await;
2579                        Ok(serde_json::Value::Null)
2580                    }
2581                    .boxed()
2582                },
2583                |_| {
2584                    async move {
2585                        async_std::task::sleep(Duration::from_millis(100)).await;
2586                        Ok(serde_json::Value::Null)
2587                    }
2588                    .boxed()
2589                },
2590            )
2591            .build()
2592            .expect("migrator should build");
2593
2594        let context = ContextBuilder::new("bob")
2595            .build()
2596            .expect("Failed to create context");
2597
2598        migrator
2599            .read(
2600                &context,
2601                "stage-flag".into(),
2602                Stage::Off,
2603                serde_json::Value::Null,
2604            )
2605            .await;
2606
2607        client.flush();
2608        client.close();
2609
2610        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
2611        assert_eq!(events.len(), 3);
2612        match &events[1] {
2613            OutputEvent::MigrationOp(event) => {
2614                assert!(event.consistency_check == Some(expected_consistency))
2615            }
2616            _ => panic!("Expected migration event"),
2617        }
2618    }
2619
2620    fn make_mocked_client_with_delay(
2621        delay: u64,
2622        offline: bool,
2623        daemon_mode: bool,
2624    ) -> (Client, Receiver<OutputEvent>) {
2625        let updates = Arc::new(MockDataSource::new_with_init_delay(delay));
2626        let (event_sender, event_rx) = create_event_sender();
2627
2628        let config = ConfigBuilder::new("sdk-key")
2629            .offline(offline)
2630            .daemon_mode(daemon_mode)
2631            .data_source(MockDataSourceBuilder::new().data_source(updates))
2632            .event_processor(
2633                EventProcessorBuilder::<HttpConnector>::new().event_sender(Arc::new(event_sender)),
2634            )
2635            .build()
2636            .expect("config should build");
2637
2638        let client = Client::build(config).expect("Should be built.");
2639
2640        (client, event_rx)
2641    }
2642
2643    fn make_mocked_offline_client() -> (Client, Receiver<OutputEvent>) {
2644        make_mocked_client_with_delay(0, true, false)
2645    }
2646
2647    fn make_mocked_client() -> (Client, Receiver<OutputEvent>) {
2648        make_mocked_client_with_delay(0, false, false)
2649    }
2650}