1use std::sync::Arc;
2use std::sync::Mutex;
3use std::time::Instant;
4
5use futures::future::join_all;
6use futures::future::BoxFuture;
7use futures::future::FutureExt;
8use launchdarkly_server_sdk_evaluation::Context;
9use rand::rng;
10use serde::Serialize;
11
12use crate::sampler::Sampler;
13use crate::sampler::ThreadRngSampler;
14use crate::{Client, ExecutionOrder, MigrationOpTracker, Operation, Origin, Stage};
15
16#[derive(Serialize)]
17#[serde(rename_all = "camelCase")]
18pub struct MigrationOriginResult<T> {
21 pub origin: Origin,
22 pub result: MigrationResult<T>,
23}
24
25type MigrationResult<T> = Result<T, String>;
30
31pub struct MigrationWriteResult<T> {
38 pub authoritative: MigrationOriginResult<T>,
39 pub nonauthoritative: Option<MigrationOriginResult<T>>,
40}
41
42type MigrationComparisonFn<T> = fn(&T, &T) -> bool;
45
46struct MigrationConfig<P, T, FO, FN>
47where
48 P: Send + Sync,
49 T: Send + Sync,
50 FO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
51 FN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
52{
53 old: FO,
54 new: FN,
55 compare: Option<MigrationComparisonFn<T>>,
56
57 _p: std::marker::PhantomData<P>,
58}
59
60pub struct MigratorBuilder<P, T, FRO, FRN, FWO, FWN>
64where
65 P: Send + Sync,
66 T: Send + Sync,
67 FRO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
68 FRN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
69 FWO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
70 FWN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
71{
72 client: Arc<Client>,
73 read_execution_order: ExecutionOrder,
74 measure_latency: bool,
75 measure_errors: bool,
76
77 read_config: Option<MigrationConfig<P, T, FRO, FRN>>,
78 write_config: Option<MigrationConfig<P, T, FWO, FWN>>,
79}
80
81impl<P, T, FRO, FRN, FWO, FWN> MigratorBuilder<P, T, FRO, FRN, FWO, FWN>
82where
83 P: Send + Sync,
84 T: Send + Sync,
85 FRO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
86 FRN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
87 FWO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
88 FWN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
89{
90 pub fn new(client: Arc<Client>) -> Self {
92 MigratorBuilder {
93 client,
94 read_execution_order: ExecutionOrder::Concurrent,
95 measure_latency: true,
96 measure_errors: true,
97 read_config: None,
98 write_config: None,
99 }
100 }
101
102 pub fn read_execution_order(mut self, order: ExecutionOrder) -> Self {
105 self.read_execution_order = order;
106 self
107 }
108
109 pub fn track_latency(mut self, measure: bool) -> Self {
112 self.measure_latency = measure;
113 self
114 }
115
116 pub fn track_errors(mut self, measure: bool) -> Self {
119 self.measure_errors = measure;
120 self
121 }
122
123 pub fn read(mut self, old: FRO, new: FRN, compare: Option<MigrationComparisonFn<T>>) -> Self {
132 self.read_config = Some(MigrationConfig {
133 old,
134 new,
135 compare,
136 _p: std::marker::PhantomData,
137 });
138 self
139 }
140
141 pub fn write(mut self, old: FWO, new: FWN) -> Self {
149 self.write_config = Some(MigrationConfig {
150 old,
151 new,
152 compare: None,
153 _p: std::marker::PhantomData,
154 });
155 self
156 }
157
158 pub fn build(self) -> Result<Migrator<P, T, FRO, FRN, FWO, FWN>, String> {
161 let read_config = self.read_config.ok_or("read configuration not provided")?;
162 let write_config = self
163 .write_config
164 .ok_or("write configuration not provided")?;
165
166 Ok(Migrator::new(
167 self.client,
168 self.read_execution_order,
169 self.measure_latency,
170 self.measure_errors,
171 read_config,
172 write_config,
173 ))
174 }
175}
176
177pub struct Migrator<P, T, FRO, FRN, FWO, FWN>
181where
182 P: Send + Sync,
183 T: Send + Sync,
184 FRO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
185 FRN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
186 FWO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
187 FWN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
188{
189 client: Arc<Client>,
190 read_execution_order: ExecutionOrder,
191 measure_latency: bool,
192 measure_errors: bool,
193 read_config: MigrationConfig<P, T, FRO, FRN>,
194 write_config: MigrationConfig<P, T, FWO, FWN>,
195 sampler: Box<dyn Sampler>,
196}
197
198impl<P, T, FRO, FRN, FWO, FWN> Migrator<P, T, FRO, FRN, FWO, FWN>
199where
200 P: Send + Sync,
201 T: Send + Sync,
202 FRO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
203 FRN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
204 FWO: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
205 FWN: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
206{
207 fn new(
208 client: Arc<Client>,
209 read_execution_order: ExecutionOrder,
210 measure_latency: bool,
211 measure_errors: bool,
212 read_config: MigrationConfig<P, T, FRO, FRN>,
213 write_config: MigrationConfig<P, T, FWO, FWN>,
214 ) -> Self {
215 Migrator {
216 client,
217 read_execution_order,
218 measure_latency,
219 measure_errors,
220 read_config,
221 write_config,
222 sampler: Box::new(ThreadRngSampler::new(rng())),
223 }
224 }
225
226 pub async fn read(
228 &mut self,
229 context: &Context,
230 flag_key: String,
231 default_stage: Stage,
232 payload: P,
233 ) -> MigrationOriginResult<T> {
234 let (stage, tracker) = self
235 .client
236 .migration_variation(context, &flag_key, default_stage);
237
238 if let Ok(mut tracker) = tracker.lock() {
239 tracker.operation(Operation::Read);
240 } else {
241 error!("Failed to acquire tracker lock. Cannot track migration write.");
242 }
243
244 let mut old = Executor {
245 origin: Origin::Old,
246 function: &self.read_config.old,
247 tracker: tracker.clone(),
248 measure_latency: self.measure_latency,
249 measure_errors: self.measure_errors,
250 payload: &payload,
251 };
252 let mut new = Executor {
253 origin: Origin::New,
254 function: &self.read_config.new,
255 tracker: tracker.clone(),
256 measure_latency: self.measure_latency,
257 measure_errors: self.measure_errors,
258 payload: &payload,
259 };
260
261 let result = match stage {
262 Stage::Off => old.run().await,
263 Stage::DualWrite => old.run().await,
264 Stage::Shadow => {
265 read_both(
266 old,
267 new,
268 self.read_config.compare,
269 self.read_execution_order,
270 tracker.clone(),
271 self.sampler.as_mut(),
272 )
273 .await
274 }
275 Stage::Live => {
276 read_both(
277 new,
278 old,
279 self.read_config.compare,
280 self.read_execution_order,
281 tracker.clone(),
282 self.sampler.as_mut(),
283 )
284 .await
285 }
286 Stage::Rampdown => new.run().await,
287 Stage::Complete => new.run().await,
288 };
289
290 self.client.track_migration_op(tracker);
291
292 result
293 }
294
295 pub async fn write(
297 &mut self,
298 context: &Context,
299 flag_key: String,
300 default_stage: Stage,
301 payload: P,
302 ) -> MigrationWriteResult<T> {
303 let (stage, tracker) = self
304 .client
305 .migration_variation(context, &flag_key, default_stage);
306
307 if let Ok(mut tracker) = tracker.lock() {
308 tracker.operation(Operation::Write);
309 } else {
310 error!("Failed to acquire tracker lock. Cannot track migration write.");
311 }
312
313 let mut old = Executor {
314 origin: Origin::Old,
315 function: &self.write_config.old,
316 tracker: tracker.clone(),
317 measure_latency: self.measure_latency,
318 measure_errors: self.measure_errors,
319 payload: &payload,
320 };
321 let mut new = Executor {
322 origin: Origin::New,
323 function: &self.write_config.new,
324 tracker: tracker.clone(),
325 measure_latency: self.measure_latency,
326 measure_errors: self.measure_errors,
327 payload: &payload,
328 };
329
330 let result = match stage {
331 Stage::Off => MigrationWriteResult {
332 authoritative: old.run().await,
333 nonauthoritative: None,
334 },
335 Stage::DualWrite => write_both(old, new).await,
336 Stage::Shadow => write_both(old, new).await,
337 Stage::Live => write_both(new, old).await,
338 Stage::Rampdown => write_both(new, old).await,
339 Stage::Complete => MigrationWriteResult {
340 authoritative: new.run().await,
341 nonauthoritative: None,
342 },
343 };
344
345 self.client.track_migration_op(tracker);
346
347 result
348 }
349}
350
351async fn read_both<P, T, FA, FB>(
352 mut authoritative: Executor<'_, P, T, FA>,
353 mut nonauthoritative: Executor<'_, P, T, FB>,
354 compare: Option<MigrationComparisonFn<T>>,
355 execution_order: ExecutionOrder,
356 tracker: Arc<Mutex<MigrationOpTracker>>,
357 sampler: &mut dyn Sampler,
358) -> MigrationOriginResult<T>
359where
360 P: Send + Sync,
361 T: Send + Sync,
362 FA: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
363 FB: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
364{
365 let authoritative_result: MigrationOriginResult<T>;
366 let nonauthoritative_result: MigrationOriginResult<T>;
367
368 match execution_order {
369 ExecutionOrder::Concurrent => {
370 let auth_handle = authoritative.run().boxed();
371 let nonauth_handle = nonauthoritative.run().boxed();
372 let handles = vec![auth_handle, nonauth_handle];
373
374 let mut results = join_all(handles).await;
375
376 nonauthoritative_result = results.pop().unwrap_or_else(|| MigrationOriginResult {
379 origin: nonauthoritative.origin,
380 result: Err("Failed to execute non-authoritative read".into()),
381 });
382
383 authoritative_result = results.pop().unwrap_or_else(|| MigrationOriginResult {
384 origin: authoritative.origin,
385 result: Err("Failed to execute authoritative read".into()),
386 });
387 }
388 ExecutionOrder::Random if sampler.sample(2) => {
389 nonauthoritative_result = nonauthoritative.run().await;
390 authoritative_result = authoritative.run().await;
391 }
392 _ => {
393 authoritative_result = authoritative.run().await;
394 nonauthoritative_result = nonauthoritative.run().await;
395 }
396 };
397
398 if let Some(compare) = compare {
399 if let (Ok(authoritative), Ok(nonauthoritative)) = (
400 &authoritative_result.result,
401 &nonauthoritative_result.result,
402 ) {
403 if let Ok(mut tracker) = tracker.lock() {
404 tracker.consistent(|| compare(authoritative, nonauthoritative));
405 } else {
406 error!("Failed to acquire tracker lock. Cannot track consistency.");
407 }
408 }
409 }
410
411 authoritative_result
412}
413
414async fn write_both<P, T, FA, FB>(
415 mut authoritative: Executor<'_, P, T, FA>,
416 mut nonauthoritative: Executor<'_, P, T, FB>,
417) -> MigrationWriteResult<T>
418where
419 P: Send + Sync,
420 T: Send + Sync,
421 FA: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
422 FB: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
423{
424 let authoritative_result = authoritative.run().await;
425
426 if authoritative_result.result.is_err() {
427 return MigrationWriteResult {
428 authoritative: authoritative_result,
429 nonauthoritative: None,
430 };
431 }
432
433 let nonauthoritative_result = nonauthoritative.run().await;
434
435 MigrationWriteResult {
436 authoritative: authoritative_result,
437 nonauthoritative: Some(nonauthoritative_result),
438 }
439}
440
441struct Executor<'a, P, T, F>
442where
443 P: Send + Sync,
444 T: Send + Sync,
445 F: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
446{
447 origin: Origin,
448 function: &'a F,
449 tracker: Arc<Mutex<MigrationOpTracker>>,
450 measure_latency: bool,
451 measure_errors: bool,
452 payload: &'a P,
453}
454
455impl<P, T, F> Executor<'_, P, T, F>
456where
457 P: Send + Sync,
458 T: Send + Sync,
459 F: Fn(&P) -> BoxFuture<MigrationResult<T>> + Sync + Send,
460{
461 async fn run(&mut self) -> MigrationOriginResult<T> {
462 let start = Instant::now();
463 let result = (self.function)(self.payload).await;
464 let elapsed = start.elapsed();
465
466 let result = match self.tracker.lock() {
467 Ok(mut tracker) => {
468 if self.measure_latency {
469 tracker.latency(self.origin, elapsed);
470 }
471
472 if self.measure_errors && result.is_err() {
473 tracker.error(self.origin);
474 }
475
476 tracker.invoked(self.origin);
477
478 result
479 }
480 Err(_) => Err("Failed to acquire lock".into()),
481 };
482
483 MigrationOriginResult {
484 origin: self.origin,
485 result,
486 }
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use std::{
493 sync::{mpsc, Arc},
494 time::{Duration, Instant},
495 };
496
497 use crate::{
498 migrations::migrator::MigratorBuilder, Client, ConfigBuilder, ExecutionOrder, Stage,
499 };
500 use futures::future::FutureExt;
501 use launchdarkly_server_sdk_evaluation::ContextBuilder;
502 use test_case::test_case;
503
504 #[test]
505 fn can_build_successfully() {
506 let config = ConfigBuilder::new("sdk-key")
507 .offline(true)
508 .build()
509 .expect("config failed to build");
510
511 let client = Arc::new(Client::build(config).expect("client failed to build"));
512 let migrator = MigratorBuilder::new(client)
513 .track_latency(false)
514 .track_errors(false)
515 .read(
516 |_: &u32| async move { Ok(()) }.boxed(),
517 |_: &u32| async move { Ok(()) }.boxed(),
518 Some(|_, _| true),
519 )
520 .write(
521 |_: &u32| async move { Ok(()) }.boxed(),
522 |_: &u32| async move { Ok(()) }.boxed(),
523 )
524 .build();
525
526 assert!(migrator.is_ok());
527 }
528
529 #[tokio::test]
530 async fn read_passes_payload_through() {
531 let config = ConfigBuilder::new("sdk-key")
532 .offline(true)
533 .build()
534 .expect("config failed to build");
535
536 let client = Arc::new(Client::build(config).expect("client failed to build"));
537 client.start_with_default_executor();
538
539 let (sender, receiver) = mpsc::channel();
540 let old_sender = sender.clone();
541 let new_sender = sender.clone();
542 let mut migrator = MigratorBuilder::new(client)
543 .track_latency(false)
544 .track_errors(false)
545 .write(
546 |_| async move { Ok(0) }.boxed(),
547 |_| async move { Ok(0) }.boxed(),
548 )
549 .read_execution_order(ExecutionOrder::Serial)
550 .read(
551 move |&payload| {
552 let old_sender = old_sender.clone();
553 async move {
554 old_sender.send(payload).unwrap();
555 Ok(0)
556 }
557 .boxed()
558 },
559 move |&payload| {
560 let new_sender = new_sender.clone();
561 async move {
562 new_sender.send(payload).unwrap();
563 Ok(0)
564 }
565 .boxed()
566 },
567 None,
568 )
569 .build()
570 .expect("migrator failed to build");
571
572 let _result = migrator
573 .read(
574 &ContextBuilder::new("user-key")
575 .build()
576 .expect("context failed to build"),
577 "migration-key".into(),
578 crate::Stage::Shadow,
579 1,
580 )
581 .await;
582
583 let old_payload = receiver.recv().unwrap();
584 let new_payload = receiver.recv().unwrap();
585
586 assert_eq!(old_payload, 1);
587 assert_eq!(new_payload, 1);
588 }
589
590 #[tokio::test]
591 async fn write_passes_payload_through() {
592 let config = ConfigBuilder::new("sdk-key")
593 .offline(true)
594 .build()
595 .expect("config failed to build");
596
597 let client = Arc::new(Client::build(config).expect("client failed to build"));
598 client.start_with_default_executor();
599
600 let (sender, receiver) = mpsc::channel();
601 let old_sender = sender.clone();
602 let new_sender = sender.clone();
603 let mut migrator = MigratorBuilder::new(client)
604 .track_latency(false)
605 .track_errors(false)
606 .read(
607 |_| async move { Ok(0) }.boxed(),
608 |_| async move { Ok(0) }.boxed(),
609 Some(|_, _| true),
610 )
611 .write(
612 move |&payload| {
613 let old_sender = old_sender.clone();
614 async move {
615 old_sender.send(payload).unwrap();
616 Ok(0)
617 }
618 .boxed()
619 },
620 move |&payload| {
621 let new_sender = new_sender.clone();
622 async move {
623 new_sender.send(payload).unwrap();
624 Ok(0)
625 }
626 .boxed()
627 },
628 )
629 .build()
630 .expect("migrator failed to build");
631
632 let _result = migrator
633 .write(
634 &ContextBuilder::new("user-key")
635 .build()
636 .expect("context failed to build"),
637 "migration-key".into(),
638 crate::Stage::Shadow,
639 1,
640 )
641 .await;
642
643 let old_payload = receiver.recv().unwrap();
644 let new_payload = receiver.recv().unwrap();
645
646 assert_eq!(old_payload, 1);
647 assert_eq!(new_payload, 1);
648 }
649
650 #[tokio::test]
651 async fn read_handles_correct_origin() {
652 read_handles_correct_origin_driver(Stage::Off, true, false).await;
653 read_handles_correct_origin_driver(Stage::DualWrite, true, false).await;
654 read_handles_correct_origin_driver(Stage::Shadow, true, true).await;
655 read_handles_correct_origin_driver(Stage::Live, true, true).await;
656 read_handles_correct_origin_driver(Stage::Rampdown, false, true).await;
657 read_handles_correct_origin_driver(Stage::Complete, false, true).await;
658 }
659
660 async fn read_handles_correct_origin_driver(
661 stage: Stage,
662 expected_old: bool,
663 expected_new: bool,
664 ) {
665 let config = ConfigBuilder::new("sdk-key")
666 .offline(true)
667 .build()
668 .expect("config failed to build");
669
670 let client = Arc::new(Client::build(config).expect("client failed to build"));
671 client.start_with_default_executor();
672
673 let (sender, receiver) = mpsc::channel();
674 let old_sender = sender.clone();
675 let new_sender = sender.clone();
676 let mut migrator = MigratorBuilder::new(client)
677 .track_latency(false)
678 .track_errors(false)
679 .write(
680 |_| async move { Ok("write") }.boxed(),
681 |_| async move { Ok("write") }.boxed(),
682 )
683 .read_execution_order(ExecutionOrder::Serial)
684 .read(
685 move |_| {
686 let old_sender = old_sender.clone();
687 async move {
688 old_sender.send("old").unwrap();
689 Ok("read")
690 }
691 .boxed()
692 },
693 move |_| {
694 let new_sender = new_sender.clone();
695 async move {
696 new_sender.send("new").unwrap();
697 Ok("read")
698 }
699 .boxed()
700 },
701 None,
702 )
703 .build()
704 .expect("migrator failed to build");
705
706 let _result = migrator
707 .read(
708 &ContextBuilder::new("user-key")
709 .build()
710 .expect("context failed to build"),
711 "migration-key".into(),
712 stage,
713 "payload",
714 )
715 .await;
716
717 let payloads = receiver.try_iter().collect::<Vec<_>>();
718
719 if expected_old {
720 assert!(payloads.contains(&"old"));
721 } else {
722 assert!(!payloads.contains(&"old"));
723 }
724
725 if expected_new {
726 assert!(payloads.contains(&"new"));
727 } else {
728 assert!(!payloads.contains(&"new"));
729 }
730 }
731
732 #[tokio::test]
733 async fn read_handles_concurrent_execution() {
734 let config = ConfigBuilder::new("sdk-key")
735 .offline(true)
736 .build()
737 .expect("config failed to build");
738
739 let client = Arc::new(Client::build(config).expect("client failed to build"));
740 client.start_with_default_executor();
741
742 let mut migrator = MigratorBuilder::new(client)
743 .track_latency(false)
744 .track_errors(false)
745 .write(
746 |_| async move { Ok(()) }.boxed(),
747 |_| async move { Ok(()) }.boxed(),
748 )
749 .read_execution_order(ExecutionOrder::Concurrent)
750 .read(
751 |_| {
752 async move {
753 async_std::task::sleep(std::time::Duration::from_millis(250)).await;
754 Ok(())
755 }
756 .boxed()
757 },
758 |_| {
759 async move {
760 async_std::task::sleep(std::time::Duration::from_millis(250)).await;
761 Ok(())
762 }
763 .boxed()
764 },
765 None,
766 )
767 .build()
768 .expect("migrator failed to build");
769
770 let start = Instant::now();
771 let _result = migrator
772 .read(
773 &ContextBuilder::new("user-key")
774 .build()
775 .expect("context failed to build"),
776 "migration-key".into(),
777 crate::Stage::Shadow,
778 (),
779 )
780 .await;
781 let elapsed = start.elapsed();
782 assert!(elapsed < Duration::from_millis(500));
783 }
784
785 #[tokio::test]
786 async fn read_handles_nonconcurrent_execution() {
787 read_handles_nonconcurrent_execution_driver(ExecutionOrder::Serial).await;
788 read_handles_nonconcurrent_execution_driver(ExecutionOrder::Random).await;
789 }
790
791 async fn read_handles_nonconcurrent_execution_driver(execution_order: ExecutionOrder) {
792 let config = ConfigBuilder::new("sdk-key")
793 .offline(true)
794 .build()
795 .expect("config failed to build");
796
797 let client = Arc::new(Client::build(config).expect("client failed to build"));
798 client.start_with_default_executor();
799
800 let mut migrator = MigratorBuilder::new(client)
801 .track_latency(false)
802 .track_errors(false)
803 .write(
804 |_| async move { Ok(()) }.boxed(),
805 |_| async move { Ok(()) }.boxed(),
806 )
807 .read_execution_order(execution_order)
808 .read(
809 |_| {
810 async move {
811 std::thread::sleep(std::time::Duration::from_millis(250));
812 Ok(())
813 }
814 .boxed()
815 },
816 |_| {
817 async move {
818 std::thread::sleep(std::time::Duration::from_millis(250));
819 Ok(())
820 }
821 .boxed()
822 },
823 None,
824 )
825 .build()
826 .expect("migrator failed to build");
827
828 let start = Instant::now();
829 let _result = migrator
830 .read(
831 &ContextBuilder::new("user-key")
832 .build()
833 .expect("context failed to build"),
834 "migration-key".into(),
835 crate::Stage::Shadow,
836 (),
837 )
838 .await;
839 let elapsed = start.elapsed();
840 assert!(elapsed >= Duration::from_millis(500));
841 }
842
843 #[tokio::test]
844 async fn write_handles_correct_origin() {
845 write_handles_correct_origin_driver(Stage::Off, true, false).await;
846 write_handles_correct_origin_driver(Stage::DualWrite, true, true).await;
847 write_handles_correct_origin_driver(Stage::Shadow, true, true).await;
848 write_handles_correct_origin_driver(Stage::Live, true, true).await;
849 write_handles_correct_origin_driver(Stage::Rampdown, true, true).await;
850 write_handles_correct_origin_driver(Stage::Complete, false, true).await;
851 }
852
853 async fn write_handles_correct_origin_driver(
854 stage: Stage,
855 expected_old: bool,
856 expected_new: bool,
857 ) {
858 let config = ConfigBuilder::new("sdk-key")
859 .offline(true)
860 .build()
861 .expect("config failed to build");
862
863 let client = Arc::new(Client::build(config).expect("client failed to build"));
864 client.start_with_default_executor();
865
866 let (sender, receiver) = mpsc::channel();
867 let old_sender = sender.clone();
868 let new_sender = sender.clone();
869 let mut migrator = MigratorBuilder::new(client)
870 .track_latency(false)
871 .track_errors(false)
872 .read(
873 |_| async move { Ok(()) }.boxed(),
874 |_| async move { Ok(()) }.boxed(),
875 Some(|_, _| true),
876 )
877 .write(
878 move |_| {
879 let old_sender = old_sender.clone();
880 async move {
881 old_sender.send("old").unwrap();
882 Ok(())
883 }
884 .boxed()
885 },
886 move |_| {
887 let new_sender = new_sender.clone();
888 async move {
889 new_sender.send("new").unwrap();
890 Ok(())
891 }
892 .boxed()
893 },
894 )
895 .build()
896 .expect("migrator failed to build");
897
898 let _result = migrator
899 .write(
900 &ContextBuilder::new("user-key")
901 .build()
902 .expect("context failed to build"),
903 "migration-key".into(),
904 stage,
905 (),
906 )
907 .await;
908
909 let payloads = receiver.try_iter().collect::<Vec<_>>();
910
911 if expected_old {
912 assert!(payloads.contains(&"old"));
913 } else {
914 assert!(!payloads.contains(&"old"));
915 }
916
917 if expected_new {
918 assert!(payloads.contains(&"new"));
919 } else {
920 assert!(!payloads.contains(&"new"));
921 }
922 }
923
924 #[tokio::test]
925 async fn write_stops_if_authoritative_fails() {
926 write_stops_if_authoritative_fails_driver(Stage::DualWrite, true, false).await;
930 write_stops_if_authoritative_fails_driver(Stage::Shadow, true, false).await;
931 write_stops_if_authoritative_fails_driver(Stage::Live, false, true).await;
932 write_stops_if_authoritative_fails_driver(Stage::Rampdown, false, true).await;
933
934 }
937
938 async fn write_stops_if_authoritative_fails_driver(
939 stage: Stage,
940 expected_old: bool,
941 expected_new: bool,
942 ) {
943 let config = ConfigBuilder::new("sdk-key")
944 .offline(true)
945 .build()
946 .expect("config failed to build");
947
948 let client = Arc::new(Client::build(config).expect("client failed to build"));
949 client.start_with_default_executor();
950
951 let (sender, receiver) = mpsc::channel();
952 let old_sender = sender.clone();
953 let new_sender = sender.clone();
954 let mut migrator = MigratorBuilder::new(client)
955 .track_latency(false)
956 .track_errors(false)
957 .read(
958 |_| async move { Ok(()) }.boxed(),
959 |_| async move { Ok(()) }.boxed(),
960 Some(|_, _| true),
961 )
962 .write(
963 move |_| {
964 let old_sender = old_sender.clone();
965 async move {
966 old_sender.send("old").unwrap();
967 Err("error".into())
968 }
969 .boxed()
970 },
971 move |_| {
972 let new_sender = new_sender.clone();
973 async move {
974 new_sender.send("new").unwrap();
975 Err("error".into())
976 }
977 .boxed()
978 },
979 )
980 .build()
981 .expect("migrator failed to build");
982
983 let _result = migrator
984 .write(
985 &ContextBuilder::new("user-key")
986 .build()
987 .expect("context failed to build"),
988 "migration-key".into(),
989 stage,
990 (),
991 )
992 .await;
993
994 let payloads = receiver.try_iter().collect::<Vec<_>>();
995
996 if expected_old {
997 assert!(payloads.contains(&"old"));
998 } else {
999 assert!(!payloads.contains(&"old"));
1000 }
1001
1002 if expected_new {
1003 assert!(payloads.contains(&"new"));
1004 } else {
1005 assert!(!payloads.contains(&"new"));
1006 }
1007 }
1008
1009 #[test_case(ExecutionOrder::Serial)]
1010 #[test_case(ExecutionOrder::Random)]
1011 #[test_case(ExecutionOrder::Concurrent)]
1012 fn can_modify_execution_order(execution_order: ExecutionOrder) {
1013 let config = ConfigBuilder::new("sdk-key")
1014 .offline(true)
1015 .build()
1016 .expect("config failed to build");
1017
1018 let client = Arc::new(Client::build(config).expect("client failed to build"));
1019 let migrator = MigratorBuilder::new(client)
1020 .track_latency(false)
1021 .track_errors(false)
1022 .read(
1023 |_: &u32| async move { Ok(()) }.boxed(),
1024 |_: &u32| async move { Ok(()) }.boxed(),
1025 Some(|_, _| true),
1026 )
1027 .write(
1028 |_: &u32| async move { Ok(()) }.boxed(),
1029 |_: &u32| async move { Ok(()) }.boxed(),
1030 )
1031 .read_execution_order(execution_order)
1032 .build();
1033
1034 assert!(migrator.is_ok());
1035 }
1036}