launchdarkly_server_sdk/migrations/
migrator.rs

1use std::sync::Arc;
2use std::sync::Mutex;
3use std::time::Instant;
4
5use futures::future::join_all;
6use futures::future::BoxFuture;
7use futures::future::FutureExt;
8use launchdarkly_server_sdk_evaluation::Context;
9use rand::rng;
10use serde::Serialize;
11
12use crate::sampler::Sampler;
13use crate::sampler::ThreadRngSampler;
14use crate::{Client, ExecutionOrder, MigrationOpTracker, Operation, Origin, Stage};
15
16#[derive(Serialize)]
17#[serde(rename_all = "camelCase")]
18/// An internally used struct to represent the result of a migration operation along with the
19/// origin it was executed against.
20pub struct MigrationOriginResult<T> {
21    pub origin: Origin,
22    pub result: MigrationResult<T>,
23}
24
25/// MigrationResult represents the result of a migration operation. If the operation was
26/// successful, the result will contain a pair of values representing the result of the operation
27/// and the origin it was executed against. If the operation failed, the result will contain an
28/// error.
29type MigrationResult<T> = Result<T, String>;
30
31/// A write result contains the operation results against both the authoritative and
32/// non-authoritative origins.
33///
34/// Authoritative writes are always executed first. In the event of a failure, the
35/// non-authoritative write will not be executed, resulting in a `None` value in the final
36/// MigrationWriteResult.
37pub struct MigrationWriteResult<T> {
38    pub authoritative: MigrationOriginResult<T>,
39    pub nonauthoritative: Option<MigrationOriginResult<T>>,
40}
41
42// MigrationComparisonFn is used to compare the results of two migration operations. If the
43// provided results are equal, this method will return true and false otherwise.
44type MigrationComparisonFn<T> = fn(&T, &T) -> bool;
45
46struct MigrationConfig<P, T, FO, FN>
47where
48    P: Send + Sync,
49    T: Send + Sync,
50    FO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
51    FN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
52{
53    old: FO,
54    new: FN,
55    compare: Option<MigrationComparisonFn<T>>,
56
57    _p: std::marker::PhantomData<P>,
58}
59
60/// The migration builder is used to configure and construct an instance of a [Migrator]. This
61/// migrator can be used to perform LaunchDarkly assisted technology migrations through the use of
62/// migration-based feature flags.
63pub struct MigratorBuilder<P, T, FRO, FRN, FWO, FWN>
64where
65    P: Send + Sync,
66    T: Send + Sync,
67    FRO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
68    FRN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
69    FWO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
70    FWN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
71{
72    client: Arc<Client>,
73    read_execution_order: ExecutionOrder,
74    measure_latency: bool,
75    measure_errors: bool,
76
77    read_config: Option<MigrationConfig<P, T, FRO, FRN>>,
78    write_config: Option<MigrationConfig<P, T, FWO, FWN>>,
79}
80
81impl<P, T, FRO, FRN, FWO, FWN> MigratorBuilder<P, T, FRO, FRN, FWO, FWN>
82where
83    P: Send + Sync,
84    T: Send + Sync,
85    FRO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
86    FRN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
87    FWO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
88    FWN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
89{
90    /// Create a new migrator builder instance with the provided client.
91    pub fn new(client: Arc<Client>) -> Self {
92        MigratorBuilder {
93            client,
94            read_execution_order: ExecutionOrder::Concurrent,
95            measure_latency: true,
96            measure_errors: true,
97            read_config: None,
98            write_config: None,
99        }
100    }
101
102    /// The read execution order influences the concurrency and execution order for read operations
103    /// involving multiple origins.
104    pub fn read_execution_order(mut self, order: ExecutionOrder) -> Self {
105        self.read_execution_order = order;
106        self
107    }
108
109    /// Enable or disable latency tracking for migration operations. This latency information can
110    /// be sent upstream to LaunchDarkly to enhance migration visibility.
111    pub fn track_latency(mut self, measure: bool) -> Self {
112        self.measure_latency = measure;
113        self
114    }
115
116    /// Enable or disable error tracking for migration operations. This error information can be
117    /// sent upstream to LaunchDarkly to enhance migration visibility.
118    pub fn track_errors(mut self, measure: bool) -> Self {
119        self.measure_errors = measure;
120        self
121    }
122
123    /// Read can be used to configure the migration-read behavior of the resulting
124    /// [Migrator] instance.
125    ///
126    /// Users are required to provide two different read methods -- one to read from the old migration origin, and one
127    /// to read from the new origin. Additionally, users can opt-in to consistency tracking by providing a
128    /// comparison function.
129    ///
130    /// Depending on the migration stage, one or both of these read methods may be called.
131    pub fn read(mut self, old: FRO, new: FRN, compare: Option<MigrationComparisonFn<T>>) -> Self {
132        self.read_config = Some(MigrationConfig {
133            old,
134            new,
135            compare,
136            _p: std::marker::PhantomData,
137        });
138        self
139    }
140
141    /// Write can be used to configure the migration-write behavior of the resulting
142    /// [crate::Migrator] instance.
143    ///
144    /// Users are required to provide two different write methods -- one to write to the old
145    /// migration origin, and one to write to the new origin. Not every stage requires
146    ///
147    /// Depending on the migration stage, one or both of these write methods may be called.
148    pub fn write(mut self, old: FWO, new: FWN) -> Self {
149        self.write_config = Some(MigrationConfig {
150            old,
151            new,
152            compare: None,
153            _p: std::marker::PhantomData,
154        });
155        self
156    }
157
158    /// Build constructs a [crate::Migrator] instance to support migration-based reads and
159    /// writes. A string describing any failure conditions will be returned if the build fails.
160    pub fn build(self) -> Result<Migrator<P, T, FRO, FRN, FWO, FWN>, String> {
161        let read_config = self.read_config.ok_or("read configuration not provided")?;
162        let write_config = self
163            .write_config
164            .ok_or("write configuration not provided")?;
165
166        Ok(Migrator::new(
167            self.client,
168            self.read_execution_order,
169            self.measure_latency,
170            self.measure_errors,
171            read_config,
172            write_config,
173        ))
174    }
175}
176
177/// The migrator is the primary interface for executing migration operations. It is configured
178/// through the [MigratorBuilder] and can be used to perform LaunchDarkly assisted technology
179/// migrations through the use of migration-based feature flags.
180pub struct Migrator<P, T, FRO, FRN, FWO, FWN>
181where
182    P: Send + Sync,
183    T: Send + Sync,
184    FRO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
185    FRN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
186    FWO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
187    FWN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
188{
189    client: Arc<Client>,
190    read_execution_order: ExecutionOrder,
191    measure_latency: bool,
192    measure_errors: bool,
193    read_config: MigrationConfig<P, T, FRO, FRN>,
194    write_config: MigrationConfig<P, T, FWO, FWN>,
195    sampler: Box<dyn Sampler>,
196}
197
198impl<P, T, FRO, FRN, FWO, FWN> Migrator<P, T, FRO, FRN, FWO, FWN>
199where
200    P: Send + Sync,
201    T: Send + Sync,
202    FRO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
203    FRN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
204    FWO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
205    FWN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
206{
207    fn new(
208        client: Arc<Client>,
209        read_execution_order: ExecutionOrder,
210        measure_latency: bool,
211        measure_errors: bool,
212        read_config: MigrationConfig<P, T, FRO, FRN>,
213        write_config: MigrationConfig<P, T, FWO, FWN>,
214    ) -> Self {
215        Migrator {
216            client,
217            read_execution_order,
218            measure_latency,
219            measure_errors,
220            read_config,
221            write_config,
222            sampler: Box::new(ThreadRngSampler::new(rng())),
223        }
224    }
225
226    /// Uses the provided flag key and context to execute a migration-backed read operation.
227    pub async fn read(
228        &mut self,
229        context: &Context,
230        flag_key: String,
231        default_stage: Stage,
232        payload: P,
233    ) -> MigrationOriginResult<T> {
234        let (stage, tracker) = self
235            .client
236            .migration_variation(context, &flag_key, default_stage);
237
238        if let Ok(mut tracker) = tracker.lock() {
239            tracker.operation(Operation::Read);
240        } else {
241            error!("Failed to acquire tracker lock. Cannot track migration write.");
242        }
243
244        let mut old = Executor {
245            origin: Origin::Old,
246            function: &self.read_config.old,
247            tracker: tracker.clone(),
248            measure_latency: self.measure_latency,
249            measure_errors: self.measure_errors,
250            payload: &payload,
251        };
252        let mut new = Executor {
253            origin: Origin::New,
254            function: &self.read_config.new,
255            tracker: tracker.clone(),
256            measure_latency: self.measure_latency,
257            measure_errors: self.measure_errors,
258            payload: &payload,
259        };
260
261        let result = match stage {
262            Stage::Off => old.run().await,
263            Stage::DualWrite => old.run().await,
264            Stage::Shadow => {
265                read_both(
266                    old,
267                    new,
268                    self.read_config.compare,
269                    self.read_execution_order,
270                    tracker.clone(),
271                    self.sampler.as_mut(),
272                )
273                .await
274            }
275            Stage::Live => {
276                read_both(
277                    new,
278                    old,
279                    self.read_config.compare,
280                    self.read_execution_order,
281                    tracker.clone(),
282                    self.sampler.as_mut(),
283                )
284                .await
285            }
286            Stage::Rampdown => new.run().await,
287            Stage::Complete => new.run().await,
288        };
289
290        self.client.track_migration_op(tracker);
291
292        result
293    }
294
295    /// Uses the provided flag key and context to execute a migration-backed write operation.
296    pub async fn write(
297        &mut self,
298        context: &Context,
299        flag_key: String,
300        default_stage: Stage,
301        payload: P,
302    ) -> MigrationWriteResult<T> {
303        let (stage, tracker) = self
304            .client
305            .migration_variation(context, &flag_key, default_stage);
306
307        if let Ok(mut tracker) = tracker.lock() {
308            tracker.operation(Operation::Write);
309        } else {
310            error!("Failed to acquire tracker lock. Cannot track migration write.");
311        }
312
313        let mut old = Executor {
314            origin: Origin::Old,
315            function: &self.write_config.old,
316            tracker: tracker.clone(),
317            measure_latency: self.measure_latency,
318            measure_errors: self.measure_errors,
319            payload: &payload,
320        };
321        let mut new = Executor {
322            origin: Origin::New,
323            function: &self.write_config.new,
324            tracker: tracker.clone(),
325            measure_latency: self.measure_latency,
326            measure_errors: self.measure_errors,
327            payload: &payload,
328        };
329
330        let result = match stage {
331            Stage::Off => MigrationWriteResult {
332                authoritative: old.run().await,
333                nonauthoritative: None,
334            },
335            Stage::DualWrite => write_both(old, new).await,
336            Stage::Shadow => write_both(old, new).await,
337            Stage::Live => write_both(new, old).await,
338            Stage::Rampdown => write_both(new, old).await,
339            Stage::Complete => MigrationWriteResult {
340                authoritative: new.run().await,
341                nonauthoritative: None,
342            },
343        };
344
345        self.client.track_migration_op(tracker);
346
347        result
348    }
349}
350
351async fn read_both<P, T, FA, FB>(
352    mut authoritative: Executor<'_, P, T, FA>,
353    mut nonauthoritative: Executor<'_, P, T, FB>,
354    compare: Option<MigrationComparisonFn<T>>,
355    execution_order: ExecutionOrder,
356    tracker: Arc<Mutex<MigrationOpTracker>>,
357    sampler: &mut dyn Sampler,
358) -> MigrationOriginResult<T>
359where
360    P: Send + Sync,
361    T: Send + Sync,
362    FA: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
363    FB: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
364{
365    let authoritative_result: MigrationOriginResult<T>;
366    let nonauthoritative_result: MigrationOriginResult<T>;
367
368    match execution_order {
369        ExecutionOrder::Concurrent => {
370            let auth_handle = authoritative.run().boxed();
371            let nonauth_handle = nonauthoritative.run().boxed();
372            let handles = vec![auth_handle, nonauth_handle];
373
374            let mut results = join_all(handles).await;
375
376            // Note that we are doing this is the reverse order of the handles since we are
377            // popping the results off the end of the vector.
378            nonauthoritative_result = results.pop().unwrap_or_else(|| MigrationOriginResult {
379                origin: nonauthoritative.origin,
380                result: Err("Failed to execute non-authoritative read".into()),
381            });
382
383            authoritative_result = results.pop().unwrap_or_else(|| MigrationOriginResult {
384                origin: authoritative.origin,
385                result: Err("Failed to execute authoritative read".into()),
386            });
387        }
388        ExecutionOrder::Random if sampler.sample(2) => {
389            nonauthoritative_result = nonauthoritative.run().await;
390            authoritative_result = authoritative.run().await;
391        }
392        _ => {
393            authoritative_result = authoritative.run().await;
394            nonauthoritative_result = nonauthoritative.run().await;
395        }
396    };
397
398    if let Some(compare) = compare {
399        if let (Ok(authoritative), Ok(nonauthoritative)) = (
400            &authoritative_result.result,
401            &nonauthoritative_result.result,
402        ) {
403            if let Ok(mut tracker) = tracker.lock() {
404                tracker.consistent(|| compare(authoritative, nonauthoritative));
405            } else {
406                error!("Failed to acquire tracker lock. Cannot track consistency.");
407            }
408        }
409    }
410
411    authoritative_result
412}
413
414async fn write_both<P, T, FA, FB>(
415    mut authoritative: Executor<'_, P, T, FA>,
416    mut nonauthoritative: Executor<'_, P, T, FB>,
417) -> MigrationWriteResult<T>
418where
419    P: Send + Sync,
420    T: Send + Sync,
421    FA: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
422    FB: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
423{
424    let authoritative_result = authoritative.run().await;
425
426    if authoritative_result.result.is_err() {
427        return MigrationWriteResult {
428            authoritative: authoritative_result,
429            nonauthoritative: None,
430        };
431    }
432
433    let nonauthoritative_result = nonauthoritative.run().await;
434
435    MigrationWriteResult {
436        authoritative: authoritative_result,
437        nonauthoritative: Some(nonauthoritative_result),
438    }
439}
440
441struct Executor<'a, P, T, F>
442where
443    P: Send + Sync,
444    T: Send + Sync,
445    F: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
446{
447    origin: Origin,
448    function: &'a F,
449    tracker: Arc<Mutex<MigrationOpTracker>>,
450    measure_latency: bool,
451    measure_errors: bool,
452    payload: &'a P,
453}
454
455impl<P, T, F> Executor<'_, P, T, F>
456where
457    P: Send + Sync,
458    T: Send + Sync,
459    F: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
460{
461    async fn run(&mut self) -> MigrationOriginResult<T> {
462        let start = Instant::now();
463        let result = (self.function)(self.payload).await;
464        let elapsed = start.elapsed();
465
466        let result = match self.tracker.lock() {
467            Ok(mut tracker) => {
468                if self.measure_latency {
469                    tracker.latency(self.origin, elapsed);
470                }
471
472                if self.measure_errors && result.is_err() {
473                    tracker.error(self.origin);
474                }
475
476                tracker.invoked(self.origin);
477
478                result
479            }
480            Err(_) => Err("Failed to acquire lock".into()),
481        };
482
483        MigrationOriginResult {
484            origin: self.origin,
485            result,
486        }
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use std::{
493        sync::{mpsc, Arc},
494        time::{Duration, Instant},
495    };
496
497    use crate::{
498        migrations::migrator::MigratorBuilder, Client, ConfigBuilder, ExecutionOrder, Stage,
499    };
500    use futures::future::FutureExt;
501    use launchdarkly_server_sdk_evaluation::ContextBuilder;
502    use test_case::test_case;
503
504    #[test]
505    fn can_build_successfully() {
506        let config = ConfigBuilder::new("sdk-key")
507            .offline(true)
508            .build()
509            .expect("config failed to build");
510
511        let client = Arc::new(Client::build(config).expect("client failed to build"));
512        let migrator = MigratorBuilder::new(client)
513            .track_latency(false)
514            .track_errors(false)
515            .read(
516                |_: &u32| async move { Ok(()) }.boxed(),
517                |_: &u32| async move { Ok(()) }.boxed(),
518                Some(|_, _| true),
519            )
520            .write(
521                |_: &u32| async move { Ok(()) }.boxed(),
522                |_: &u32| async move { Ok(()) }.boxed(),
523            )
524            .build();
525
526        assert!(migrator.is_ok());
527    }
528
529    #[tokio::test]
530    async fn read_passes_payload_through() {
531        let config = ConfigBuilder::new("sdk-key")
532            .offline(true)
533            .build()
534            .expect("config failed to build");
535
536        let client = Arc::new(Client::build(config).expect("client failed to build"));
537        client.start_with_default_executor();
538
539        let (sender, receiver) = mpsc::channel();
540        let old_sender = sender.clone();
541        let new_sender = sender.clone();
542        let mut migrator = MigratorBuilder::new(client)
543            .track_latency(false)
544            .track_errors(false)
545            .write(
546                |_| async move { Ok(0) }.boxed(),
547                |_| async move { Ok(0) }.boxed(),
548            )
549            .read_execution_order(ExecutionOrder::Serial)
550            .read(
551                move |&payload| {
552                    let old_sender = old_sender.clone();
553                    async move {
554                        old_sender.send(payload).unwrap();
555                        Ok(0)
556                    }
557                    .boxed()
558                },
559                move |&payload| {
560                    let new_sender = new_sender.clone();
561                    async move {
562                        new_sender.send(payload).unwrap();
563                        Ok(0)
564                    }
565                    .boxed()
566                },
567                None,
568            )
569            .build()
570            .expect("migrator failed to build");
571
572        let _result = migrator
573            .read(
574                &ContextBuilder::new("user-key")
575                    .build()
576                    .expect("context failed to build"),
577                "migration-key".into(),
578                crate::Stage::Shadow,
579                1,
580            )
581            .await;
582
583        let old_payload = receiver.recv().unwrap();
584        let new_payload = receiver.recv().unwrap();
585
586        assert_eq!(old_payload, 1);
587        assert_eq!(new_payload, 1);
588    }
589
590    #[tokio::test]
591    async fn write_passes_payload_through() {
592        let config = ConfigBuilder::new("sdk-key")
593            .offline(true)
594            .build()
595            .expect("config failed to build");
596
597        let client = Arc::new(Client::build(config).expect("client failed to build"));
598        client.start_with_default_executor();
599
600        let (sender, receiver) = mpsc::channel();
601        let old_sender = sender.clone();
602        let new_sender = sender.clone();
603        let mut migrator = MigratorBuilder::new(client)
604            .track_latency(false)
605            .track_errors(false)
606            .read(
607                |_| async move { Ok(0) }.boxed(),
608                |_| async move { Ok(0) }.boxed(),
609                Some(|_, _| true),
610            )
611            .write(
612                move |&payload| {
613                    let old_sender = old_sender.clone();
614                    async move {
615                        old_sender.send(payload).unwrap();
616                        Ok(0)
617                    }
618                    .boxed()
619                },
620                move |&payload| {
621                    let new_sender = new_sender.clone();
622                    async move {
623                        new_sender.send(payload).unwrap();
624                        Ok(0)
625                    }
626                    .boxed()
627                },
628            )
629            .build()
630            .expect("migrator failed to build");
631
632        let _result = migrator
633            .write(
634                &ContextBuilder::new("user-key")
635                    .build()
636                    .expect("context failed to build"),
637                "migration-key".into(),
638                crate::Stage::Shadow,
639                1,
640            )
641            .await;
642
643        let old_payload = receiver.recv().unwrap();
644        let new_payload = receiver.recv().unwrap();
645
646        assert_eq!(old_payload, 1);
647        assert_eq!(new_payload, 1);
648    }
649
650    #[tokio::test]
651    async fn read_handles_correct_origin() {
652        read_handles_correct_origin_driver(Stage::Off, true, false).await;
653        read_handles_correct_origin_driver(Stage::DualWrite, true, false).await;
654        read_handles_correct_origin_driver(Stage::Shadow, true, true).await;
655        read_handles_correct_origin_driver(Stage::Live, true, true).await;
656        read_handles_correct_origin_driver(Stage::Rampdown, false, true).await;
657        read_handles_correct_origin_driver(Stage::Complete, false, true).await;
658    }
659
660    async fn read_handles_correct_origin_driver(
661        stage: Stage,
662        expected_old: bool,
663        expected_new: bool,
664    ) {
665        let config = ConfigBuilder::new("sdk-key")
666            .offline(true)
667            .build()
668            .expect("config failed to build");
669
670        let client = Arc::new(Client::build(config).expect("client failed to build"));
671        client.start_with_default_executor();
672
673        let (sender, receiver) = mpsc::channel();
674        let old_sender = sender.clone();
675        let new_sender = sender.clone();
676        let mut migrator = MigratorBuilder::new(client)
677            .track_latency(false)
678            .track_errors(false)
679            .write(
680                |_| async move { Ok("write") }.boxed(),
681                |_| async move { Ok("write") }.boxed(),
682            )
683            .read_execution_order(ExecutionOrder::Serial)
684            .read(
685                move |_| {
686                    let old_sender = old_sender.clone();
687                    async move {
688                        old_sender.send("old").unwrap();
689                        Ok("read")
690                    }
691                    .boxed()
692                },
693                move |_| {
694                    let new_sender = new_sender.clone();
695                    async move {
696                        new_sender.send("new").unwrap();
697                        Ok("read")
698                    }
699                    .boxed()
700                },
701                None,
702            )
703            .build()
704            .expect("migrator failed to build");
705
706        let _result = migrator
707            .read(
708                &ContextBuilder::new("user-key")
709                    .build()
710                    .expect("context failed to build"),
711                "migration-key".into(),
712                stage,
713                "payload",
714            )
715            .await;
716
717        let payloads = receiver.try_iter().collect::<Vec<_>>();
718
719        if expected_old {
720            assert!(payloads.contains(&"old"));
721        } else {
722            assert!(!payloads.contains(&"old"));
723        }
724
725        if expected_new {
726            assert!(payloads.contains(&"new"));
727        } else {
728            assert!(!payloads.contains(&"new"));
729        }
730    }
731
732    #[tokio::test]
733    async fn read_handles_concurrent_execution() {
734        let config = ConfigBuilder::new("sdk-key")
735            .offline(true)
736            .build()
737            .expect("config failed to build");
738
739        let client = Arc::new(Client::build(config).expect("client failed to build"));
740        client.start_with_default_executor();
741
742        let mut migrator = MigratorBuilder::new(client)
743            .track_latency(false)
744            .track_errors(false)
745            .write(
746                |_| async move { Ok(()) }.boxed(),
747                |_| async move { Ok(()) }.boxed(),
748            )
749            .read_execution_order(ExecutionOrder::Concurrent)
750            .read(
751                |_| {
752                    async move {
753                        async_std::task::sleep(std::time::Duration::from_millis(250)).await;
754                        Ok(())
755                    }
756                    .boxed()
757                },
758                |_| {
759                    async move {
760                        async_std::task::sleep(std::time::Duration::from_millis(250)).await;
761                        Ok(())
762                    }
763                    .boxed()
764                },
765                None,
766            )
767            .build()
768            .expect("migrator failed to build");
769
770        let start = Instant::now();
771        let _result = migrator
772            .read(
773                &ContextBuilder::new("user-key")
774                    .build()
775                    .expect("context failed to build"),
776                "migration-key".into(),
777                crate::Stage::Shadow,
778                (),
779            )
780            .await;
781        let elapsed = start.elapsed();
782        assert!(elapsed < Duration::from_millis(500));
783    }
784
785    #[tokio::test]
786    async fn read_handles_nonconcurrent_execution() {
787        read_handles_nonconcurrent_execution_driver(ExecutionOrder::Serial).await;
788        read_handles_nonconcurrent_execution_driver(ExecutionOrder::Random).await;
789    }
790
791    async fn read_handles_nonconcurrent_execution_driver(execution_order: ExecutionOrder) {
792        let config = ConfigBuilder::new("sdk-key")
793            .offline(true)
794            .build()
795            .expect("config failed to build");
796
797        let client = Arc::new(Client::build(config).expect("client failed to build"));
798        client.start_with_default_executor();
799
800        let mut migrator = MigratorBuilder::new(client)
801            .track_latency(false)
802            .track_errors(false)
803            .write(
804                |_| async move { Ok(()) }.boxed(),
805                |_| async move { Ok(()) }.boxed(),
806            )
807            .read_execution_order(execution_order)
808            .read(
809                |_| {
810                    async move {
811                        std::thread::sleep(std::time::Duration::from_millis(250));
812                        Ok(())
813                    }
814                    .boxed()
815                },
816                |_| {
817                    async move {
818                        std::thread::sleep(std::time::Duration::from_millis(250));
819                        Ok(())
820                    }
821                    .boxed()
822                },
823                None,
824            )
825            .build()
826            .expect("migrator failed to build");
827
828        let start = Instant::now();
829        let _result = migrator
830            .read(
831                &ContextBuilder::new("user-key")
832                    .build()
833                    .expect("context failed to build"),
834                "migration-key".into(),
835                crate::Stage::Shadow,
836                (),
837            )
838            .await;
839        let elapsed = start.elapsed();
840        assert!(elapsed >= Duration::from_millis(500));
841    }
842
843    #[tokio::test]
844    async fn write_handles_correct_origin() {
845        write_handles_correct_origin_driver(Stage::Off, true, false).await;
846        write_handles_correct_origin_driver(Stage::DualWrite, true, true).await;
847        write_handles_correct_origin_driver(Stage::Shadow, true, true).await;
848        write_handles_correct_origin_driver(Stage::Live, true, true).await;
849        write_handles_correct_origin_driver(Stage::Rampdown, true, true).await;
850        write_handles_correct_origin_driver(Stage::Complete, false, true).await;
851    }
852
853    async fn write_handles_correct_origin_driver(
854        stage: Stage,
855        expected_old: bool,
856        expected_new: bool,
857    ) {
858        let config = ConfigBuilder::new("sdk-key")
859            .offline(true)
860            .build()
861            .expect("config failed to build");
862
863        let client = Arc::new(Client::build(config).expect("client failed to build"));
864        client.start_with_default_executor();
865
866        let (sender, receiver) = mpsc::channel();
867        let old_sender = sender.clone();
868        let new_sender = sender.clone();
869        let mut migrator = MigratorBuilder::new(client)
870            .track_latency(false)
871            .track_errors(false)
872            .read(
873                |_| async move { Ok(()) }.boxed(),
874                |_| async move { Ok(()) }.boxed(),
875                Some(|_, _| true),
876            )
877            .write(
878                move |_| {
879                    let old_sender = old_sender.clone();
880                    async move {
881                        old_sender.send("old").unwrap();
882                        Ok(())
883                    }
884                    .boxed()
885                },
886                move |_| {
887                    let new_sender = new_sender.clone();
888                    async move {
889                        new_sender.send("new").unwrap();
890                        Ok(())
891                    }
892                    .boxed()
893                },
894            )
895            .build()
896            .expect("migrator failed to build");
897
898        let _result = migrator
899            .write(
900                &ContextBuilder::new("user-key")
901                    .build()
902                    .expect("context failed to build"),
903                "migration-key".into(),
904                stage,
905                (),
906            )
907            .await;
908
909        let payloads = receiver.try_iter().collect::<Vec<_>>();
910
911        if expected_old {
912            assert!(payloads.contains(&"old"));
913        } else {
914            assert!(!payloads.contains(&"old"));
915        }
916
917        if expected_new {
918            assert!(payloads.contains(&"new"));
919        } else {
920            assert!(!payloads.contains(&"new"));
921        }
922    }
923
924    #[tokio::test]
925    async fn write_stops_if_authoritative_fails() {
926        // doesn't write to new if old fails
927        // write_stops_if_authoritative_fails_driver(Stage::Off, true, false).await;
928
929        write_stops_if_authoritative_fails_driver(Stage::DualWrite, true, false).await;
930        write_stops_if_authoritative_fails_driver(Stage::Shadow, true, false).await;
931        write_stops_if_authoritative_fails_driver(Stage::Live, false, true).await;
932        write_stops_if_authoritative_fails_driver(Stage::Rampdown, false, true).await;
933
934        // doesn't write to old if new fails
935        // write_stops_if_authoritative_fails_driver(Stage::Complete, false, true).await;
936    }
937
938    async fn write_stops_if_authoritative_fails_driver(
939        stage: Stage,
940        expected_old: bool,
941        expected_new: bool,
942    ) {
943        let config = ConfigBuilder::new("sdk-key")
944            .offline(true)
945            .build()
946            .expect("config failed to build");
947
948        let client = Arc::new(Client::build(config).expect("client failed to build"));
949        client.start_with_default_executor();
950
951        let (sender, receiver) = mpsc::channel();
952        let old_sender = sender.clone();
953        let new_sender = sender.clone();
954        let mut migrator = MigratorBuilder::new(client)
955            .track_latency(false)
956            .track_errors(false)
957            .read(
958                |_| async move { Ok(()) }.boxed(),
959                |_| async move { Ok(()) }.boxed(),
960                Some(|_, _| true),
961            )
962            .write(
963                move |_| {
964                    let old_sender = old_sender.clone();
965                    async move {
966                        old_sender.send("old").unwrap();
967                        Err("error".into())
968                    }
969                    .boxed()
970                },
971                move |_| {
972                    let new_sender = new_sender.clone();
973                    async move {
974                        new_sender.send("new").unwrap();
975                        Err("error".into())
976                    }
977                    .boxed()
978                },
979            )
980            .build()
981            .expect("migrator failed to build");
982
983        let _result = migrator
984            .write(
985                &ContextBuilder::new("user-key")
986                    .build()
987                    .expect("context failed to build"),
988                "migration-key".into(),
989                stage,
990                (),
991            )
992            .await;
993
994        let payloads = receiver.try_iter().collect::<Vec<_>>();
995
996        if expected_old {
997            assert!(payloads.contains(&"old"));
998        } else {
999            assert!(!payloads.contains(&"old"));
1000        }
1001
1002        if expected_new {
1003            assert!(payloads.contains(&"new"));
1004        } else {
1005            assert!(!payloads.contains(&"new"));
1006        }
1007    }
1008
1009    #[test_case(ExecutionOrder::Serial)]
1010    #[test_case(ExecutionOrder::Random)]
1011    #[test_case(ExecutionOrder::Concurrent)]
1012    fn can_modify_execution_order(execution_order: ExecutionOrder) {
1013        let config = ConfigBuilder::new("sdk-key")
1014            .offline(true)
1015            .build()
1016            .expect("config failed to build");
1017
1018        let client = Arc::new(Client::build(config).expect("client failed to build"));
1019        let migrator = MigratorBuilder::new(client)
1020            .track_latency(false)
1021            .track_errors(false)
1022            .read(
1023                |_: &u32| async move { Ok(()) }.boxed(),
1024                |_: &u32| async move { Ok(()) }.boxed(),
1025                Some(|_, _| true),
1026            )
1027            .write(
1028                |_: &u32| async move { Ok(()) }.boxed(),
1029                |_: &u32| async move { Ok(()) }.boxed(),
1030            )
1031            .read_execution_order(execution_order)
1032            .build();
1033
1034        assert!(migrator.is_ok());
1035    }
1036}