launchdarkly_server_sdk/events/
dispatcher.rs

1use crossbeam_channel::{bounded, select, tick, Receiver, Sender};
2use rand::rng;
3use std::time::SystemTime;
4
5use launchdarkly_server_sdk_evaluation::Context;
6use lru::LruCache;
7
8use super::event::{BaseEvent, FeatureRequestEvent, IndexEvent};
9use crate::sampler::{Sampler, ThreadRngSampler};
10
11use super::sender::EventSenderResult;
12use super::{
13    event::{EventSummary, InputEvent, OutputEvent},
14    EventsConfiguration,
15};
16
17struct Outbox {
18    events: Vec<OutputEvent>,
19    summary: EventSummary,
20    capacity_exceeded: bool,
21    capacity: usize,
22}
23
24impl Outbox {
25    fn new(capacity: usize) -> Self {
26        Self {
27            events: Vec::with_capacity(capacity),
28            summary: EventSummary::new(),
29            capacity_exceeded: false,
30            capacity,
31        }
32    }
33
34    fn add_event(&mut self, output_event: OutputEvent) {
35        if self.events.len() == self.capacity {
36            if !self.capacity_exceeded {
37                self.capacity_exceeded = true;
38                warn!("Exceeded event queue capacity. Increase capacity to avoid dropping events.");
39            }
40            return;
41        }
42
43        self.capacity_exceeded = false;
44        self.events.push(output_event);
45    }
46
47    fn add_to_summary(&mut self, event: &FeatureRequestEvent) {
48        self.summary.add(event);
49    }
50
51    fn get_payload(&mut self) -> Vec<OutputEvent> {
52        let mut payload = Vec::with_capacity(self.capacity + 1);
53        payload.append(&mut self.events);
54
55        if !self.summary.is_empty() {
56            payload.push(OutputEvent::Summary(self.summary.clone()));
57            self.summary.reset();
58        }
59
60        payload
61    }
62
63    fn is_empty(&self) -> bool {
64        self.events.is_empty() && self.summary.is_empty()
65    }
66
67    fn reset(&mut self) {
68        self.events.clear();
69        self.summary.reset();
70    }
71}
72
73pub(super) struct EventDispatcher {
74    outbox: Outbox,
75    context_keys: LruCache<String, ()>,
76    events_configuration: EventsConfiguration,
77    last_known_time: u128,
78    disabled: bool,
79    thread_count: usize,
80    sampler: Box<dyn Sampler>,
81}
82
83impl EventDispatcher {
84    pub(super) fn new(events_configuration: EventsConfiguration) -> Self {
85        Self {
86            outbox: Outbox::new(events_configuration.capacity),
87            context_keys: LruCache::<String, ()>::new(events_configuration.context_keys_capacity),
88            events_configuration,
89            last_known_time: 0,
90            disabled: false,
91            thread_count: 5,
92            sampler: Box::new(ThreadRngSampler::new(rng())),
93        }
94    }
95
96    pub(super) fn start(&mut self, inbox_rx: Receiver<EventDispatcherMessage>) {
97        let reset_context_cache_ticker =
98            tick(self.events_configuration.context_keys_flush_interval);
99        let flush_ticker = tick(self.events_configuration.flush_interval);
100        let (event_result_tx, event_result_rx) = bounded::<EventSenderResult>(self.thread_count);
101
102        let rt = tokio::runtime::Builder::new_multi_thread()
103            .worker_threads(self.thread_count)
104            .enable_io()
105            .enable_time()
106            .build();
107
108        let rt = match rt {
109            Ok(rt) => rt,
110            Err(e) => {
111                error!("Could not start runtime for event sending: {}", e);
112                return;
113            }
114        };
115
116        let (send, recv) = bounded::<()>(1);
117
118        loop {
119            debug!("waiting for a batch to send");
120
121            loop {
122                select! {
123                    recv(event_result_rx) -> result => match result {
124                        Ok(result) if result.success => {
125                            (self.events_configuration.on_success)(&result);
126                            self.last_known_time = std::cmp::max(result.time_from_server, self.last_known_time);
127                        },
128                        Ok(result) if result.must_shutdown => {
129                            self.disabled = true;
130                            self.outbox.reset();
131                        },
132                        Ok(_) => continue,
133                        Err(e) => {
134                            error!("event_result_rx is disconnected. Shutting down dispatcher: {}", e);
135                            return;
136                        }
137                    },
138                    recv(reset_context_cache_ticker) -> _ => self.context_keys.clear(),
139                    recv(flush_ticker) -> _ => break,
140                    recv(inbox_rx) -> result => match result {
141                        Ok(EventDispatcherMessage::Flush) => break,
142                        Ok(EventDispatcherMessage::EventMessage(event)) => {
143                            if !self.disabled {
144                                self.process_event(event);
145                            }
146                        }
147                        Ok(EventDispatcherMessage::Close(sender)) => {
148                            drop(send);
149                            //Should unblock once all the senders are dropped.
150                            let _ = recv.recv();
151
152                            // We call drop here to make sure this receiver is completely
153                            // disconnected. This ensures the event processor cannot send another
154                            // message. We could rely on Rust to drop this during the normal course
155                            // of operation, but there is a small chance for a deadlock issue if we
156                            // call EventProcessor::close twice in rapid succession.
157                            drop(inbox_rx);
158
159                            let _ = sender.send(());
160
161                            return;
162                        }
163                        Err(e) => {
164                            error!("inbox_rx is disconnected. Shutting down dispatcher: {}", e);
165                            return;
166                        }
167                    }
168                }
169            }
170
171            if self.disabled {
172                continue;
173            }
174
175            if !self.outbox.is_empty() {
176                let payload = self.outbox.get_payload();
177
178                debug!("Sending batch of {} events", payload.len());
179
180                let sender = self.events_configuration.event_sender.clone();
181                let results = event_result_tx.clone();
182                let send = send.clone();
183                rt.spawn(async move {
184                    sender.send_event_data(payload, results).await;
185                    drop(send);
186                });
187            }
188        }
189    }
190
191    fn process_event(&mut self, event: InputEvent) {
192        match event {
193            InputEvent::MigrationOp(migration_op) => {
194                if self
195                    .sampler
196                    .sample(migration_op.sampling_ratio.unwrap_or(1))
197                {
198                    self.outbox
199                        .add_event(OutputEvent::MigrationOp(migration_op));
200                }
201            }
202            InputEvent::FeatureRequest(fre) => {
203                if !fre.exclude_from_summaries {
204                    self.outbox.add_to_summary(&fre);
205                }
206
207                let inlined = fre.clone().into_inline_with_anonymous_redaction(
208                    self.events_configuration.all_attributes_private,
209                    self.events_configuration.private_attributes.clone(),
210                );
211
212                if let Some(context) = self.get_indexable_context(&fre.base) {
213                    let base = BaseEvent::new(fre.base.creation_date, context).into_inline(
214                        self.events_configuration.all_attributes_private,
215                        self.events_configuration.private_attributes.clone(),
216                    );
217                    self.outbox
218                        .add_event(OutputEvent::Index(IndexEvent::from(base)));
219                }
220
221                let now = match SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
222                    Ok(time) => time.as_millis(),
223                    _ => 0,
224                };
225
226                if let Some(debug_events_until_date) = fre.debug_events_until_date {
227                    let time = u128::from(debug_events_until_date);
228                    if time > now
229                        && time > self.last_known_time
230                        && self.sampler.sample(fre.sampling_ratio.unwrap_or(1))
231                    {
232                        self.outbox
233                            .add_event(OutputEvent::Debug(fre.clone().into_inline(
234                                self.events_configuration.all_attributes_private,
235                                self.events_configuration.private_attributes.clone(),
236                            )));
237                    }
238                }
239
240                if fre.track_events && self.sampler.sample(fre.sampling_ratio.unwrap_or(1)) {
241                    self.outbox.add_event(OutputEvent::FeatureRequest(inlined));
242                }
243            }
244            InputEvent::Identify(mut identify) => {
245                if self.events_configuration.omit_anonymous_contexts {
246                    match identify.base.context.without_anonymous_contexts() {
247                        Ok(context) => identify.base.context = context,
248                        Err(_) => return,
249                    }
250                }
251
252                self.notice_context(&identify.base.context);
253                if self.sampler.sample(identify.sampling_ratio.unwrap_or(1)) {
254                    self.outbox
255                        .add_event(OutputEvent::Identify(identify.into_inline(
256                            self.events_configuration.all_attributes_private,
257                            self.events_configuration.private_attributes.clone(),
258                        )));
259                }
260            }
261            InputEvent::Custom(custom) => {
262                if let Some(context) = self.get_indexable_context(&custom.base) {
263                    let base = BaseEvent::new(custom.base.creation_date, context).into_inline(
264                        self.events_configuration.all_attributes_private,
265                        self.events_configuration.private_attributes.clone(),
266                    );
267                    self.outbox
268                        .add_event(OutputEvent::Index(IndexEvent::from(base)));
269                }
270
271                if self.sampler.sample(custom.sampling_ratio.unwrap_or(1)) {
272                    self.outbox.add_event(OutputEvent::Custom(custom));
273                }
274            }
275        }
276    }
277
278    fn get_indexable_context(&mut self, event: &BaseEvent) -> Option<Context> {
279        let context = match self.events_configuration.omit_anonymous_contexts {
280            true => event.context.without_anonymous_contexts(),
281            false => Ok(event.context.clone()),
282        };
283
284        if let Ok(ctx) = context {
285            if self.notice_context(&ctx) {
286                return Some(ctx);
287            }
288
289            return None;
290        }
291
292        None
293    }
294
295    fn notice_context(&mut self, context: &Context) -> bool {
296        let key = context.canonical_key();
297
298        if self.context_keys.get(key).is_none() {
299            trace!("noticing new context {:?}", key);
300            self.context_keys.put(key.to_owned(), ());
301            true
302        } else {
303            trace!("ignoring already-seen context {:?}", key);
304            false
305        }
306    }
307}
308
309#[allow(clippy::large_enum_variant)]
310pub(super) enum EventDispatcherMessage {
311    EventMessage(InputEvent),
312    Flush,
313    Close(Sender<()>),
314}
315
316#[cfg(test)]
317mod tests {
318    use std::thread;
319    use std::time::Duration;
320
321    use super::*;
322    use crate::events::event::{EventFactory, OutputEvent};
323    use crate::events::{create_event_sender, create_events_configuration};
324    use crate::test_common::basic_flag;
325    use launchdarkly_server_sdk_evaluation::{
326        ContextBuilder, Detail, FlagValue, MultiContextBuilder, Reason,
327    };
328    use test_case::test_case;
329
330    #[test]
331    fn get_payload_from_outbox_empties_outbox() {
332        let (event_sender, _) = create_event_sender();
333        let events_configuration =
334            create_events_configuration(event_sender, Duration::from_secs(100));
335        let mut dispatcher = create_dispatcher(events_configuration);
336
337        let context = ContextBuilder::new("context")
338            .build()
339            .expect("Failed to create context");
340        let event_factory = EventFactory::new(true);
341        dispatcher.process_event(event_factory.new_identify(context));
342        let _ = dispatcher.outbox.get_payload();
343
344        assert!(dispatcher.outbox.is_empty());
345    }
346
347    #[test]
348    fn dispatcher_ignores_events_over_capacity() {
349        let (event_sender, _) = create_event_sender();
350        let events_configuration =
351            create_events_configuration(event_sender, Duration::from_secs(100));
352        let mut dispatcher = create_dispatcher(events_configuration);
353
354        let context = ContextBuilder::new("context")
355            .build()
356            .expect("Failed to create context");
357        let event_factory = EventFactory::new(true);
358
359        for _ in 0..10 {
360            dispatcher.process_event(event_factory.new_identify(context.clone()));
361        }
362
363        assert_eq!(5, dispatcher.outbox.events.len());
364        assert!(dispatcher
365            .outbox
366            .events
367            .iter()
368            .all(|event| event.kind() == "identify"));
369        assert_eq!(1, dispatcher.context_keys.len());
370    }
371
372    #[test]
373    fn dispatcher_handles_feature_request_events_correctly() {
374        let (event_sender, _) = create_event_sender();
375        let events_configuration =
376            create_events_configuration(event_sender, Duration::from_secs(100));
377        let mut dispatcher = create_dispatcher(events_configuration);
378
379        let context = ContextBuilder::new("context")
380            .build()
381            .expect("Failed to create context");
382        let mut flag = basic_flag("flag");
383        flag.debug_events_until_date = Some(64_060_606_800_000);
384        flag.track_events = true;
385
386        let detail = Detail {
387            value: Some(FlagValue::from(false)),
388            variation_index: Some(1),
389            reason: Reason::Fallthrough {
390                in_experiment: false,
391            },
392        };
393
394        let event_factory = EventFactory::new(true);
395        let feature_request_event = event_factory.new_eval_event(
396            &flag.key,
397            context,
398            &flag,
399            detail,
400            FlagValue::from(false),
401            None,
402        );
403
404        dispatcher.process_event(feature_request_event);
405        assert_eq!(3, dispatcher.outbox.events.len());
406        assert_eq!("index", dispatcher.outbox.events[0].kind());
407        assert_eq!("debug", dispatcher.outbox.events[1].kind());
408        assert_eq!("feature", dispatcher.outbox.events[2].kind());
409        assert_eq!(1, dispatcher.context_keys.len());
410        assert_eq!(1, dispatcher.outbox.summary.features.len());
411    }
412
413    #[test]
414    fn dispatcher_strips_anonymous_contexts_from_index_for_feature_request_events() {
415        let (event_sender, _) = create_event_sender();
416        let mut events_configuration =
417            create_events_configuration(event_sender, Duration::from_secs(100));
418        events_configuration.omit_anonymous_contexts = true;
419        let mut dispatcher = create_dispatcher(events_configuration);
420
421        let context = ContextBuilder::new("context")
422            .anonymous(true)
423            .build()
424            .expect("Failed to create context");
425        let mut flag = basic_flag("flag");
426        flag.debug_events_until_date = Some(64_060_606_800_000);
427        flag.track_events = true;
428
429        let detail = Detail {
430            value: Some(FlagValue::from(false)),
431            variation_index: Some(1),
432            reason: Reason::Fallthrough {
433                in_experiment: false,
434            },
435        };
436
437        let event_factory = EventFactory::new(true);
438        let feature_request_event = event_factory.new_eval_event(
439            &flag.key,
440            context,
441            &flag,
442            detail,
443            FlagValue::from(false),
444            None,
445        );
446
447        dispatcher.process_event(feature_request_event);
448        assert_eq!(2, dispatcher.outbox.events.len());
449        assert_eq!("debug", dispatcher.outbox.events[0].kind());
450        assert_eq!("feature", dispatcher.outbox.events[1].kind());
451        assert_eq!(0, dispatcher.context_keys.len());
452        assert_eq!(1, dispatcher.outbox.summary.features.len());
453    }
454
455    #[test]
456    fn dispatcher_ignores_feature_events_with_0_sampling_ratio() {
457        let (event_sender, _) = create_event_sender();
458        let events_configuration =
459            create_events_configuration(event_sender, Duration::from_secs(100));
460        let mut dispatcher = create_dispatcher(events_configuration);
461
462        let context = ContextBuilder::new("context")
463            .build()
464            .expect("Failed to create context");
465        let mut flag = basic_flag("flag");
466        flag.sampling_ratio = Some(0);
467        flag.debug_events_until_date = Some(64_060_606_800_000);
468        flag.track_events = true;
469
470        let detail = Detail {
471            value: Some(FlagValue::from(false)),
472            variation_index: Some(1),
473            reason: Reason::Fallthrough {
474                in_experiment: false,
475            },
476        };
477
478        let event_factory = EventFactory::new(true);
479        let feature_request_event = event_factory.new_eval_event(
480            &flag.key,
481            context,
482            &flag,
483            detail,
484            FlagValue::from(false),
485            None,
486        );
487
488        dispatcher.process_event(feature_request_event);
489        assert_eq!(1, dispatcher.outbox.events.len());
490        assert_eq!("index", dispatcher.outbox.events[0].kind());
491        assert_eq!(1, dispatcher.context_keys.len());
492        assert_eq!(1, dispatcher.outbox.summary.features.len());
493    }
494
495    #[test]
496    fn dispatcher_can_exclude_feature_event_from_summaries() {
497        let (event_sender, _) = create_event_sender();
498        let events_configuration =
499            create_events_configuration(event_sender, Duration::from_secs(100));
500        let mut dispatcher = create_dispatcher(events_configuration);
501
502        let context = ContextBuilder::new("context")
503            .build()
504            .expect("Failed to create context");
505        let mut flag = basic_flag("flag");
506        flag.exclude_from_summaries = true;
507        flag.debug_events_until_date = Some(64_060_606_800_000);
508        flag.track_events = true;
509
510        let detail = Detail {
511            value: Some(FlagValue::from(false)),
512            variation_index: Some(1),
513            reason: Reason::Fallthrough {
514                in_experiment: false,
515            },
516        };
517
518        let event_factory = EventFactory::new(true);
519        let feature_request_event = event_factory.new_eval_event(
520            &flag.key,
521            context,
522            &flag,
523            detail,
524            FlagValue::from(false),
525            None,
526        );
527
528        dispatcher.process_event(feature_request_event);
529        assert_eq!(3, dispatcher.outbox.events.len());
530        assert_eq!("index", dispatcher.outbox.events[0].kind());
531        assert_eq!("debug", dispatcher.outbox.events[1].kind());
532        assert_eq!("feature", dispatcher.outbox.events[2].kind());
533        assert_eq!(1, dispatcher.context_keys.len());
534        assert_eq!(0, dispatcher.outbox.summary.features.len());
535    }
536
537    #[test_case(0, 64_060_606_800_000, vec!["debug", "index", "summary"])]
538    #[test_case(64_060_606_800_000, 64_060_606_800_000, vec!["index", "summary"])]
539    #[test_case(64_060_606_800_001, 64_060_606_800_000, vec!["index", "summary"])]
540    fn sending_feature_event_emits_debug_event_correctly(
541        last_known_time: u128,
542        debug_events_until_date: u64,
543        expected_event_types: Vec<&str>,
544    ) {
545        let mut flag = basic_flag("flag");
546        flag.debug_events_until_date = Some(debug_events_until_date);
547        let context = ContextBuilder::new("foo")
548            .build()
549            .expect("Failed to create context");
550        let detail = Detail {
551            value: Some(FlagValue::from(false)),
552            variation_index: Some(1),
553            reason: Reason::Fallthrough {
554                in_experiment: false,
555            },
556        };
557
558        let event_factory = EventFactory::new(true);
559        let feature_request = event_factory.new_eval_event(
560            &flag.key,
561            context,
562            &flag,
563            detail,
564            FlagValue::from(false),
565            None,
566        );
567
568        let (event_sender, event_rx) = create_event_sender();
569        let events_configuration =
570            create_events_configuration(event_sender, Duration::from_secs(100));
571        let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
572
573        let dispatcher_handle = thread::Builder::new()
574            .spawn(move || {
575                let mut dispatcher = create_dispatcher(events_configuration);
576                dispatcher.last_known_time = last_known_time;
577                dispatcher.start(inbox_rx)
578            })
579            .unwrap();
580
581        inbox_tx
582            .send(EventDispatcherMessage::EventMessage(feature_request))
583            .expect("event send failed");
584        inbox_tx
585            .send(EventDispatcherMessage::Flush)
586            .expect("flush failed");
587
588        let (tx, rx) = bounded(1);
589        inbox_tx
590            .send(EventDispatcherMessage::Close(tx))
591            .expect("failed to close");
592        rx.recv().expect("failed to notify on close");
593        dispatcher_handle.join().unwrap();
594
595        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
596        assert_eq!(expected_event_types.len(), events.len());
597
598        let kinds = events.iter().map(|event| event.kind()).collect::<Vec<_>>();
599
600        for event_type in &expected_event_types {
601            assert!(kinds.contains(event_type));
602        }
603    }
604
605    #[test]
606    fn dispatcher_only_notices_identity_event_once() {
607        let (event_sender, _) = create_event_sender();
608        let events_configuration =
609            create_events_configuration(event_sender, Duration::from_secs(100));
610        let mut dispatcher = create_dispatcher(events_configuration);
611
612        let context = ContextBuilder::new("context")
613            .build()
614            .expect("Failed to create context");
615        let event_factory = EventFactory::new(true);
616
617        dispatcher.process_event(event_factory.new_identify(context.clone()));
618        dispatcher.process_event(event_factory.new_identify(context));
619        assert_eq!(2, dispatcher.outbox.events.len());
620        assert_eq!("identify", dispatcher.outbox.events[0].kind());
621        assert_eq!("identify", dispatcher.outbox.events[1].kind());
622        assert_eq!(1, dispatcher.context_keys.len());
623    }
624
625    #[test]
626    fn dispatcher_can_ignore_identify_if_anonymous() {
627        let (event_sender, _) = create_event_sender();
628        let mut events_configuration =
629            create_events_configuration(event_sender, Duration::from_secs(100));
630        events_configuration.omit_anonymous_contexts = true;
631        let mut dispatcher = create_dispatcher(events_configuration);
632
633        let context = ContextBuilder::new("context")
634            .anonymous(true)
635            .build()
636            .expect("Failed to create context");
637        let event_factory = EventFactory::new(true);
638
639        dispatcher.process_event(event_factory.new_identify(context));
640        assert_eq!(0, dispatcher.outbox.events.len());
641        assert_eq!(0, dispatcher.context_keys.len());
642    }
643
644    #[test]
645    fn dispatcher_strips_anon_contexts_from_multi_kind_identify() {
646        let (event_sender, _) = create_event_sender();
647        let mut events_configuration =
648            create_events_configuration(event_sender, Duration::from_secs(100));
649        events_configuration.omit_anonymous_contexts = true;
650        let mut dispatcher = create_dispatcher(events_configuration);
651
652        let user_context = ContextBuilder::new("user")
653            .anonymous(true)
654            .build()
655            .expect("Failed to create context");
656        let org_context = ContextBuilder::new("org")
657            .kind("org")
658            .build()
659            .expect("Failed to create context");
660        let context = MultiContextBuilder::new()
661            .add_context(user_context)
662            .add_context(org_context)
663            .build()
664            .expect("Failed to create context");
665
666        let event_factory = EventFactory::new(true);
667
668        dispatcher.process_event(event_factory.new_identify(context));
669        assert_eq!(1, dispatcher.outbox.events.len());
670        assert_eq!("identify", dispatcher.outbox.events[0].kind());
671        assert_eq!(1, dispatcher.context_keys.len());
672
673        if let OutputEvent::Identify(identify) = &dispatcher.outbox.events[0] {
674            assert_eq!("org:org", identify.base.context.canonical_key());
675        } else {
676            panic!("Expected an identify event");
677        }
678    }
679
680    #[test]
681    fn dispatcher_adds_index_on_custom_event() {
682        let (event_sender, _) = create_event_sender();
683        let events_configuration =
684            create_events_configuration(event_sender, Duration::from_secs(100));
685        let mut dispatcher = create_dispatcher(events_configuration);
686
687        let context = ContextBuilder::new("context")
688            .build()
689            .expect("Failed to create context");
690        let event_factory = EventFactory::new(true);
691        let custom_event = event_factory
692            .new_custom(context, "context", None, "")
693            .expect("failed to make new custom event");
694
695        dispatcher.process_event(custom_event);
696        assert_eq!(2, dispatcher.outbox.events.len());
697        assert_eq!("index", dispatcher.outbox.events[0].kind());
698        assert_eq!("custom", dispatcher.outbox.events[1].kind());
699        assert_eq!(1, dispatcher.context_keys.len());
700    }
701
702    #[test]
703    fn dispatcher_can_strip_anonymous_from_index_events() {
704        let (event_sender, _) = create_event_sender();
705        let mut events_configuration =
706            create_events_configuration(event_sender, Duration::from_secs(100));
707        events_configuration.omit_anonymous_contexts = true;
708        let mut dispatcher = create_dispatcher(events_configuration);
709
710        let context = ContextBuilder::new("context")
711            .anonymous(true)
712            .build()
713            .expect("Failed to create context");
714        let event_factory = EventFactory::new(true);
715        let custom_event = event_factory
716            .new_custom(context, "context", None, "")
717            .expect("failed to make new custom event");
718
719        dispatcher.process_event(custom_event);
720        assert_eq!(1, dispatcher.outbox.events.len());
721        assert_eq!("custom", dispatcher.outbox.events[0].kind());
722        assert_eq!(0, dispatcher.context_keys.len());
723    }
724
725    #[test]
726    fn dispatcher_can_strip_anonymous_from_index_events_with_multi_kinds() {
727        let (event_sender, _) = create_event_sender();
728        let mut events_configuration =
729            create_events_configuration(event_sender, Duration::from_secs(100));
730        events_configuration.omit_anonymous_contexts = true;
731        let mut dispatcher = create_dispatcher(events_configuration);
732
733        let user_context = ContextBuilder::new("user")
734            .anonymous(true)
735            .build()
736            .expect("Failed to create context");
737        let org_context = ContextBuilder::new("org")
738            .kind("org")
739            .build()
740            .expect("Failed to create context");
741        let context = MultiContextBuilder::new()
742            .add_context(user_context)
743            .add_context(org_context)
744            .build()
745            .expect("Failed to create context");
746        let event_factory = EventFactory::new(true);
747        let custom_event = event_factory
748            .new_custom(context, "context", None, "")
749            .expect("failed to make new custom event");
750
751        dispatcher.process_event(custom_event);
752        assert_eq!(2, dispatcher.outbox.events.len());
753        assert_eq!("index", dispatcher.outbox.events[0].kind());
754        assert_eq!("custom", dispatcher.outbox.events[1].kind());
755        assert_eq!(1, dispatcher.context_keys.len());
756    }
757
758    #[test]
759    fn can_process_events_successfully() {
760        let (event_sender, event_rx) = create_event_sender();
761        let events_configuration =
762            create_events_configuration(event_sender, Duration::from_secs(100));
763        let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
764
765        let dispatcher_handle = thread::Builder::new()
766            .spawn(move || {
767                let mut dispatcher = create_dispatcher(events_configuration);
768                dispatcher.start(inbox_rx)
769            })
770            .unwrap();
771
772        let context = ContextBuilder::new("context")
773            .build()
774            .expect("Failed to create context");
775        let event_factory = EventFactory::new(true);
776
777        inbox_tx
778            .send(EventDispatcherMessage::EventMessage(
779                event_factory.new_identify(context),
780            ))
781            .expect("event send failed");
782        inbox_tx
783            .send(EventDispatcherMessage::Flush)
784            .expect("flush failed");
785
786        let (tx, rx) = bounded(1);
787        inbox_tx
788            .send(EventDispatcherMessage::Close(tx))
789            .expect("failed to close");
790        rx.recv().expect("failed to notify on close");
791        dispatcher_handle.join().unwrap();
792
793        assert_eq!(event_rx.iter().count(), 1);
794    }
795
796    #[test]
797    fn dispatcher_flushes_periodically() {
798        let (event_sender, event_rx) = create_event_sender();
799        let events_configuration =
800            create_events_configuration(event_sender, Duration::from_millis(200));
801        let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
802
803        let _ = thread::Builder::new()
804            .spawn(move || {
805                let mut dispatcher = create_dispatcher(events_configuration);
806                dispatcher.start(inbox_rx)
807            })
808            .unwrap();
809
810        let context = ContextBuilder::new("context")
811            .build()
812            .expect("Failed to create context");
813        let event_factory = EventFactory::new(true);
814
815        inbox_tx
816            .send(EventDispatcherMessage::EventMessage(
817                event_factory.new_identify(context),
818            ))
819            .expect("event send failed");
820
821        thread::sleep(Duration::from_millis(300));
822        assert_eq!(event_rx.try_iter().count(), 1);
823    }
824
825    fn create_dispatcher(events_configuration: EventsConfiguration) -> EventDispatcher {
826        EventDispatcher::new(events_configuration)
827    }
828}