1use crossbeam_channel::{bounded, select, tick, Receiver, Sender};
2use rand::rng;
3use std::time::SystemTime;
4
5use launchdarkly_server_sdk_evaluation::Context;
6use lru::LruCache;
7
8use super::event::{BaseEvent, FeatureRequestEvent, IndexEvent};
9use crate::sampler::{Sampler, ThreadRngSampler};
10
11use super::sender::EventSenderResult;
12use super::{
13 event::{EventSummary, InputEvent, OutputEvent},
14 EventsConfiguration,
15};
16
17struct Outbox {
18 events: Vec<OutputEvent>,
19 summary: EventSummary,
20 capacity_exceeded: bool,
21 capacity: usize,
22}
23
24impl Outbox {
25 fn new(capacity: usize) -> Self {
26 Self {
27 events: Vec::with_capacity(capacity),
28 summary: EventSummary::new(),
29 capacity_exceeded: false,
30 capacity,
31 }
32 }
33
34 fn add_event(&mut self, output_event: OutputEvent) {
35 if self.events.len() == self.capacity {
36 if !self.capacity_exceeded {
37 self.capacity_exceeded = true;
38 warn!("Exceeded event queue capacity. Increase capacity to avoid dropping events.");
39 }
40 return;
41 }
42
43 self.capacity_exceeded = false;
44 self.events.push(output_event);
45 }
46
47 fn add_to_summary(&mut self, event: &FeatureRequestEvent) {
48 self.summary.add(event);
49 }
50
51 fn get_payload(&mut self) -> Vec<OutputEvent> {
52 let mut payload = Vec::with_capacity(self.capacity + 1);
53 payload.append(&mut self.events);
54
55 if !self.summary.is_empty() {
56 payload.push(OutputEvent::Summary(self.summary.clone()));
57 self.summary.reset();
58 }
59
60 payload
61 }
62
63 fn is_empty(&self) -> bool {
64 self.events.is_empty() && self.summary.is_empty()
65 }
66
67 fn reset(&mut self) {
68 self.events.clear();
69 self.summary.reset();
70 }
71}
72
73pub(super) struct EventDispatcher {
74 outbox: Outbox,
75 context_keys: LruCache<String, ()>,
76 events_configuration: EventsConfiguration,
77 last_known_time: u128,
78 disabled: bool,
79 thread_count: usize,
80 sampler: Box<dyn Sampler>,
81}
82
83impl EventDispatcher {
84 pub(super) fn new(events_configuration: EventsConfiguration) -> Self {
85 Self {
86 outbox: Outbox::new(events_configuration.capacity),
87 context_keys: LruCache::<String, ()>::new(events_configuration.context_keys_capacity),
88 events_configuration,
89 last_known_time: 0,
90 disabled: false,
91 thread_count: 5,
92 sampler: Box::new(ThreadRngSampler::new(rng())),
93 }
94 }
95
96 pub(super) fn start(&mut self, inbox_rx: Receiver<EventDispatcherMessage>) {
97 let reset_context_cache_ticker =
98 tick(self.events_configuration.context_keys_flush_interval);
99 let flush_ticker = tick(self.events_configuration.flush_interval);
100 let (event_result_tx, event_result_rx) = bounded::<EventSenderResult>(self.thread_count);
101
102 let rt = tokio::runtime::Builder::new_multi_thread()
103 .worker_threads(self.thread_count)
104 .enable_io()
105 .enable_time()
106 .build();
107
108 let rt = match rt {
109 Ok(rt) => rt,
110 Err(e) => {
111 error!("Could not start runtime for event sending: {}", e);
112 return;
113 }
114 };
115
116 let (send, recv) = bounded::<()>(1);
117
118 loop {
119 debug!("waiting for a batch to send");
120
121 loop {
122 select! {
123 recv(event_result_rx) -> result => match result {
124 Ok(result) if result.success => {
125 (self.events_configuration.on_success)(&result);
126 self.last_known_time = std::cmp::max(result.time_from_server, self.last_known_time);
127 },
128 Ok(result) if result.must_shutdown => {
129 self.disabled = true;
130 self.outbox.reset();
131 },
132 Ok(_) => continue,
133 Err(e) => {
134 error!("event_result_rx is disconnected. Shutting down dispatcher: {}", e);
135 return;
136 }
137 },
138 recv(reset_context_cache_ticker) -> _ => self.context_keys.clear(),
139 recv(flush_ticker) -> _ => break,
140 recv(inbox_rx) -> result => match result {
141 Ok(EventDispatcherMessage::Flush) => break,
142 Ok(EventDispatcherMessage::EventMessage(event)) => {
143 if !self.disabled {
144 self.process_event(event);
145 }
146 }
147 Ok(EventDispatcherMessage::Close(sender)) => {
148 drop(send);
149 let _ = recv.recv();
151
152 drop(inbox_rx);
158
159 let _ = sender.send(());
160
161 return;
162 }
163 Err(e) => {
164 error!("inbox_rx is disconnected. Shutting down dispatcher: {}", e);
165 return;
166 }
167 }
168 }
169 }
170
171 if self.disabled {
172 continue;
173 }
174
175 if !self.outbox.is_empty() {
176 let payload = self.outbox.get_payload();
177
178 debug!("Sending batch of {} events", payload.len());
179
180 let sender = self.events_configuration.event_sender.clone();
181 let results = event_result_tx.clone();
182 let send = send.clone();
183 rt.spawn(async move {
184 sender.send_event_data(payload, results).await;
185 drop(send);
186 });
187 }
188 }
189 }
190
191 fn process_event(&mut self, event: InputEvent) {
192 match event {
193 InputEvent::MigrationOp(migration_op) => {
194 if self
195 .sampler
196 .sample(migration_op.sampling_ratio.unwrap_or(1))
197 {
198 self.outbox
199 .add_event(OutputEvent::MigrationOp(migration_op));
200 }
201 }
202 InputEvent::FeatureRequest(fre) => {
203 if !fre.exclude_from_summaries {
204 self.outbox.add_to_summary(&fre);
205 }
206
207 let inlined = fre.clone().into_inline_with_anonymous_redaction(
208 self.events_configuration.all_attributes_private,
209 self.events_configuration.private_attributes.clone(),
210 );
211
212 if let Some(context) = self.get_indexable_context(&fre.base) {
213 let base = BaseEvent::new(fre.base.creation_date, context).into_inline(
214 self.events_configuration.all_attributes_private,
215 self.events_configuration.private_attributes.clone(),
216 );
217 self.outbox
218 .add_event(OutputEvent::Index(IndexEvent::from(base)));
219 }
220
221 let now = match SystemTime::now().duration_since(std::time::UNIX_EPOCH) {
222 Ok(time) => time.as_millis(),
223 _ => 0,
224 };
225
226 if let Some(debug_events_until_date) = fre.debug_events_until_date {
227 let time = u128::from(debug_events_until_date);
228 if time > now
229 && time > self.last_known_time
230 && self.sampler.sample(fre.sampling_ratio.unwrap_or(1))
231 {
232 self.outbox
233 .add_event(OutputEvent::Debug(fre.clone().into_inline(
234 self.events_configuration.all_attributes_private,
235 self.events_configuration.private_attributes.clone(),
236 )));
237 }
238 }
239
240 if fre.track_events && self.sampler.sample(fre.sampling_ratio.unwrap_or(1)) {
241 self.outbox.add_event(OutputEvent::FeatureRequest(inlined));
242 }
243 }
244 InputEvent::Identify(mut identify) => {
245 if self.events_configuration.omit_anonymous_contexts {
246 match identify.base.context.without_anonymous_contexts() {
247 Ok(context) => identify.base.context = context,
248 Err(_) => return,
249 }
250 }
251
252 self.notice_context(&identify.base.context);
253 if self.sampler.sample(identify.sampling_ratio.unwrap_or(1)) {
254 self.outbox
255 .add_event(OutputEvent::Identify(identify.into_inline(
256 self.events_configuration.all_attributes_private,
257 self.events_configuration.private_attributes.clone(),
258 )));
259 }
260 }
261 InputEvent::Custom(custom) => {
262 if let Some(context) = self.get_indexable_context(&custom.base) {
263 let base = BaseEvent::new(custom.base.creation_date, context).into_inline(
264 self.events_configuration.all_attributes_private,
265 self.events_configuration.private_attributes.clone(),
266 );
267 self.outbox
268 .add_event(OutputEvent::Index(IndexEvent::from(base)));
269 }
270
271 if self.sampler.sample(custom.sampling_ratio.unwrap_or(1)) {
272 self.outbox.add_event(OutputEvent::Custom(custom));
273 }
274 }
275 }
276 }
277
278 fn get_indexable_context(&mut self, event: &BaseEvent) -> Option<Context> {
279 let context = match self.events_configuration.omit_anonymous_contexts {
280 true => event.context.without_anonymous_contexts(),
281 false => Ok(event.context.clone()),
282 };
283
284 if let Ok(ctx) = context {
285 if self.notice_context(&ctx) {
286 return Some(ctx);
287 }
288
289 return None;
290 }
291
292 None
293 }
294
295 fn notice_context(&mut self, context: &Context) -> bool {
296 let key = context.canonical_key();
297
298 if self.context_keys.get(key).is_none() {
299 trace!("noticing new context {:?}", key);
300 self.context_keys.put(key.to_owned(), ());
301 true
302 } else {
303 trace!("ignoring already-seen context {:?}", key);
304 false
305 }
306 }
307}
308
309#[allow(clippy::large_enum_variant)]
310pub(super) enum EventDispatcherMessage {
311 EventMessage(InputEvent),
312 Flush,
313 Close(Sender<()>),
314}
315
316#[cfg(test)]
317mod tests {
318 use std::thread;
319 use std::time::Duration;
320
321 use super::*;
322 use crate::events::event::{EventFactory, OutputEvent};
323 use crate::events::{create_event_sender, create_events_configuration};
324 use crate::test_common::basic_flag;
325 use launchdarkly_server_sdk_evaluation::{
326 ContextBuilder, Detail, FlagValue, MultiContextBuilder, Reason,
327 };
328 use test_case::test_case;
329
330 #[test]
331 fn get_payload_from_outbox_empties_outbox() {
332 let (event_sender, _) = create_event_sender();
333 let events_configuration =
334 create_events_configuration(event_sender, Duration::from_secs(100));
335 let mut dispatcher = create_dispatcher(events_configuration);
336
337 let context = ContextBuilder::new("context")
338 .build()
339 .expect("Failed to create context");
340 let event_factory = EventFactory::new(true);
341 dispatcher.process_event(event_factory.new_identify(context));
342 let _ = dispatcher.outbox.get_payload();
343
344 assert!(dispatcher.outbox.is_empty());
345 }
346
347 #[test]
348 fn dispatcher_ignores_events_over_capacity() {
349 let (event_sender, _) = create_event_sender();
350 let events_configuration =
351 create_events_configuration(event_sender, Duration::from_secs(100));
352 let mut dispatcher = create_dispatcher(events_configuration);
353
354 let context = ContextBuilder::new("context")
355 .build()
356 .expect("Failed to create context");
357 let event_factory = EventFactory::new(true);
358
359 for _ in 0..10 {
360 dispatcher.process_event(event_factory.new_identify(context.clone()));
361 }
362
363 assert_eq!(5, dispatcher.outbox.events.len());
364 assert!(dispatcher
365 .outbox
366 .events
367 .iter()
368 .all(|event| event.kind() == "identify"));
369 assert_eq!(1, dispatcher.context_keys.len());
370 }
371
372 #[test]
373 fn dispatcher_handles_feature_request_events_correctly() {
374 let (event_sender, _) = create_event_sender();
375 let events_configuration =
376 create_events_configuration(event_sender, Duration::from_secs(100));
377 let mut dispatcher = create_dispatcher(events_configuration);
378
379 let context = ContextBuilder::new("context")
380 .build()
381 .expect("Failed to create context");
382 let mut flag = basic_flag("flag");
383 flag.debug_events_until_date = Some(64_060_606_800_000);
384 flag.track_events = true;
385
386 let detail = Detail {
387 value: Some(FlagValue::from(false)),
388 variation_index: Some(1),
389 reason: Reason::Fallthrough {
390 in_experiment: false,
391 },
392 };
393
394 let event_factory = EventFactory::new(true);
395 let feature_request_event = event_factory.new_eval_event(
396 &flag.key,
397 context,
398 &flag,
399 detail,
400 FlagValue::from(false),
401 None,
402 );
403
404 dispatcher.process_event(feature_request_event);
405 assert_eq!(3, dispatcher.outbox.events.len());
406 assert_eq!("index", dispatcher.outbox.events[0].kind());
407 assert_eq!("debug", dispatcher.outbox.events[1].kind());
408 assert_eq!("feature", dispatcher.outbox.events[2].kind());
409 assert_eq!(1, dispatcher.context_keys.len());
410 assert_eq!(1, dispatcher.outbox.summary.features.len());
411 }
412
413 #[test]
414 fn dispatcher_strips_anonymous_contexts_from_index_for_feature_request_events() {
415 let (event_sender, _) = create_event_sender();
416 let mut events_configuration =
417 create_events_configuration(event_sender, Duration::from_secs(100));
418 events_configuration.omit_anonymous_contexts = true;
419 let mut dispatcher = create_dispatcher(events_configuration);
420
421 let context = ContextBuilder::new("context")
422 .anonymous(true)
423 .build()
424 .expect("Failed to create context");
425 let mut flag = basic_flag("flag");
426 flag.debug_events_until_date = Some(64_060_606_800_000);
427 flag.track_events = true;
428
429 let detail = Detail {
430 value: Some(FlagValue::from(false)),
431 variation_index: Some(1),
432 reason: Reason::Fallthrough {
433 in_experiment: false,
434 },
435 };
436
437 let event_factory = EventFactory::new(true);
438 let feature_request_event = event_factory.new_eval_event(
439 &flag.key,
440 context,
441 &flag,
442 detail,
443 FlagValue::from(false),
444 None,
445 );
446
447 dispatcher.process_event(feature_request_event);
448 assert_eq!(2, dispatcher.outbox.events.len());
449 assert_eq!("debug", dispatcher.outbox.events[0].kind());
450 assert_eq!("feature", dispatcher.outbox.events[1].kind());
451 assert_eq!(0, dispatcher.context_keys.len());
452 assert_eq!(1, dispatcher.outbox.summary.features.len());
453 }
454
455 #[test]
456 fn dispatcher_ignores_feature_events_with_0_sampling_ratio() {
457 let (event_sender, _) = create_event_sender();
458 let events_configuration =
459 create_events_configuration(event_sender, Duration::from_secs(100));
460 let mut dispatcher = create_dispatcher(events_configuration);
461
462 let context = ContextBuilder::new("context")
463 .build()
464 .expect("Failed to create context");
465 let mut flag = basic_flag("flag");
466 flag.sampling_ratio = Some(0);
467 flag.debug_events_until_date = Some(64_060_606_800_000);
468 flag.track_events = true;
469
470 let detail = Detail {
471 value: Some(FlagValue::from(false)),
472 variation_index: Some(1),
473 reason: Reason::Fallthrough {
474 in_experiment: false,
475 },
476 };
477
478 let event_factory = EventFactory::new(true);
479 let feature_request_event = event_factory.new_eval_event(
480 &flag.key,
481 context,
482 &flag,
483 detail,
484 FlagValue::from(false),
485 None,
486 );
487
488 dispatcher.process_event(feature_request_event);
489 assert_eq!(1, dispatcher.outbox.events.len());
490 assert_eq!("index", dispatcher.outbox.events[0].kind());
491 assert_eq!(1, dispatcher.context_keys.len());
492 assert_eq!(1, dispatcher.outbox.summary.features.len());
493 }
494
495 #[test]
496 fn dispatcher_can_exclude_feature_event_from_summaries() {
497 let (event_sender, _) = create_event_sender();
498 let events_configuration =
499 create_events_configuration(event_sender, Duration::from_secs(100));
500 let mut dispatcher = create_dispatcher(events_configuration);
501
502 let context = ContextBuilder::new("context")
503 .build()
504 .expect("Failed to create context");
505 let mut flag = basic_flag("flag");
506 flag.exclude_from_summaries = true;
507 flag.debug_events_until_date = Some(64_060_606_800_000);
508 flag.track_events = true;
509
510 let detail = Detail {
511 value: Some(FlagValue::from(false)),
512 variation_index: Some(1),
513 reason: Reason::Fallthrough {
514 in_experiment: false,
515 },
516 };
517
518 let event_factory = EventFactory::new(true);
519 let feature_request_event = event_factory.new_eval_event(
520 &flag.key,
521 context,
522 &flag,
523 detail,
524 FlagValue::from(false),
525 None,
526 );
527
528 dispatcher.process_event(feature_request_event);
529 assert_eq!(3, dispatcher.outbox.events.len());
530 assert_eq!("index", dispatcher.outbox.events[0].kind());
531 assert_eq!("debug", dispatcher.outbox.events[1].kind());
532 assert_eq!("feature", dispatcher.outbox.events[2].kind());
533 assert_eq!(1, dispatcher.context_keys.len());
534 assert_eq!(0, dispatcher.outbox.summary.features.len());
535 }
536
537 #[test_case(0, 64_060_606_800_000, vec!["debug", "index", "summary"])]
538 #[test_case(64_060_606_800_000, 64_060_606_800_000, vec!["index", "summary"])]
539 #[test_case(64_060_606_800_001, 64_060_606_800_000, vec!["index", "summary"])]
540 fn sending_feature_event_emits_debug_event_correctly(
541 last_known_time: u128,
542 debug_events_until_date: u64,
543 expected_event_types: Vec<&str>,
544 ) {
545 let mut flag = basic_flag("flag");
546 flag.debug_events_until_date = Some(debug_events_until_date);
547 let context = ContextBuilder::new("foo")
548 .build()
549 .expect("Failed to create context");
550 let detail = Detail {
551 value: Some(FlagValue::from(false)),
552 variation_index: Some(1),
553 reason: Reason::Fallthrough {
554 in_experiment: false,
555 },
556 };
557
558 let event_factory = EventFactory::new(true);
559 let feature_request = event_factory.new_eval_event(
560 &flag.key,
561 context,
562 &flag,
563 detail,
564 FlagValue::from(false),
565 None,
566 );
567
568 let (event_sender, event_rx) = create_event_sender();
569 let events_configuration =
570 create_events_configuration(event_sender, Duration::from_secs(100));
571 let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
572
573 let dispatcher_handle = thread::Builder::new()
574 .spawn(move || {
575 let mut dispatcher = create_dispatcher(events_configuration);
576 dispatcher.last_known_time = last_known_time;
577 dispatcher.start(inbox_rx)
578 })
579 .unwrap();
580
581 inbox_tx
582 .send(EventDispatcherMessage::EventMessage(feature_request))
583 .expect("event send failed");
584 inbox_tx
585 .send(EventDispatcherMessage::Flush)
586 .expect("flush failed");
587
588 let (tx, rx) = bounded(1);
589 inbox_tx
590 .send(EventDispatcherMessage::Close(tx))
591 .expect("failed to close");
592 rx.recv().expect("failed to notify on close");
593 dispatcher_handle.join().unwrap();
594
595 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
596 assert_eq!(expected_event_types.len(), events.len());
597
598 let kinds = events.iter().map(|event| event.kind()).collect::<Vec<_>>();
599
600 for event_type in &expected_event_types {
601 assert!(kinds.contains(event_type));
602 }
603 }
604
605 #[test]
606 fn dispatcher_only_notices_identity_event_once() {
607 let (event_sender, _) = create_event_sender();
608 let events_configuration =
609 create_events_configuration(event_sender, Duration::from_secs(100));
610 let mut dispatcher = create_dispatcher(events_configuration);
611
612 let context = ContextBuilder::new("context")
613 .build()
614 .expect("Failed to create context");
615 let event_factory = EventFactory::new(true);
616
617 dispatcher.process_event(event_factory.new_identify(context.clone()));
618 dispatcher.process_event(event_factory.new_identify(context));
619 assert_eq!(2, dispatcher.outbox.events.len());
620 assert_eq!("identify", dispatcher.outbox.events[0].kind());
621 assert_eq!("identify", dispatcher.outbox.events[1].kind());
622 assert_eq!(1, dispatcher.context_keys.len());
623 }
624
625 #[test]
626 fn dispatcher_can_ignore_identify_if_anonymous() {
627 let (event_sender, _) = create_event_sender();
628 let mut events_configuration =
629 create_events_configuration(event_sender, Duration::from_secs(100));
630 events_configuration.omit_anonymous_contexts = true;
631 let mut dispatcher = create_dispatcher(events_configuration);
632
633 let context = ContextBuilder::new("context")
634 .anonymous(true)
635 .build()
636 .expect("Failed to create context");
637 let event_factory = EventFactory::new(true);
638
639 dispatcher.process_event(event_factory.new_identify(context));
640 assert_eq!(0, dispatcher.outbox.events.len());
641 assert_eq!(0, dispatcher.context_keys.len());
642 }
643
644 #[test]
645 fn dispatcher_strips_anon_contexts_from_multi_kind_identify() {
646 let (event_sender, _) = create_event_sender();
647 let mut events_configuration =
648 create_events_configuration(event_sender, Duration::from_secs(100));
649 events_configuration.omit_anonymous_contexts = true;
650 let mut dispatcher = create_dispatcher(events_configuration);
651
652 let user_context = ContextBuilder::new("user")
653 .anonymous(true)
654 .build()
655 .expect("Failed to create context");
656 let org_context = ContextBuilder::new("org")
657 .kind("org")
658 .build()
659 .expect("Failed to create context");
660 let context = MultiContextBuilder::new()
661 .add_context(user_context)
662 .add_context(org_context)
663 .build()
664 .expect("Failed to create context");
665
666 let event_factory = EventFactory::new(true);
667
668 dispatcher.process_event(event_factory.new_identify(context));
669 assert_eq!(1, dispatcher.outbox.events.len());
670 assert_eq!("identify", dispatcher.outbox.events[0].kind());
671 assert_eq!(1, dispatcher.context_keys.len());
672
673 if let OutputEvent::Identify(identify) = &dispatcher.outbox.events[0] {
674 assert_eq!("org:org", identify.base.context.canonical_key());
675 } else {
676 panic!("Expected an identify event");
677 }
678 }
679
680 #[test]
681 fn dispatcher_adds_index_on_custom_event() {
682 let (event_sender, _) = create_event_sender();
683 let events_configuration =
684 create_events_configuration(event_sender, Duration::from_secs(100));
685 let mut dispatcher = create_dispatcher(events_configuration);
686
687 let context = ContextBuilder::new("context")
688 .build()
689 .expect("Failed to create context");
690 let event_factory = EventFactory::new(true);
691 let custom_event = event_factory
692 .new_custom(context, "context", None, "")
693 .expect("failed to make new custom event");
694
695 dispatcher.process_event(custom_event);
696 assert_eq!(2, dispatcher.outbox.events.len());
697 assert_eq!("index", dispatcher.outbox.events[0].kind());
698 assert_eq!("custom", dispatcher.outbox.events[1].kind());
699 assert_eq!(1, dispatcher.context_keys.len());
700 }
701
702 #[test]
703 fn dispatcher_can_strip_anonymous_from_index_events() {
704 let (event_sender, _) = create_event_sender();
705 let mut events_configuration =
706 create_events_configuration(event_sender, Duration::from_secs(100));
707 events_configuration.omit_anonymous_contexts = true;
708 let mut dispatcher = create_dispatcher(events_configuration);
709
710 let context = ContextBuilder::new("context")
711 .anonymous(true)
712 .build()
713 .expect("Failed to create context");
714 let event_factory = EventFactory::new(true);
715 let custom_event = event_factory
716 .new_custom(context, "context", None, "")
717 .expect("failed to make new custom event");
718
719 dispatcher.process_event(custom_event);
720 assert_eq!(1, dispatcher.outbox.events.len());
721 assert_eq!("custom", dispatcher.outbox.events[0].kind());
722 assert_eq!(0, dispatcher.context_keys.len());
723 }
724
725 #[test]
726 fn dispatcher_can_strip_anonymous_from_index_events_with_multi_kinds() {
727 let (event_sender, _) = create_event_sender();
728 let mut events_configuration =
729 create_events_configuration(event_sender, Duration::from_secs(100));
730 events_configuration.omit_anonymous_contexts = true;
731 let mut dispatcher = create_dispatcher(events_configuration);
732
733 let user_context = ContextBuilder::new("user")
734 .anonymous(true)
735 .build()
736 .expect("Failed to create context");
737 let org_context = ContextBuilder::new("org")
738 .kind("org")
739 .build()
740 .expect("Failed to create context");
741 let context = MultiContextBuilder::new()
742 .add_context(user_context)
743 .add_context(org_context)
744 .build()
745 .expect("Failed to create context");
746 let event_factory = EventFactory::new(true);
747 let custom_event = event_factory
748 .new_custom(context, "context", None, "")
749 .expect("failed to make new custom event");
750
751 dispatcher.process_event(custom_event);
752 assert_eq!(2, dispatcher.outbox.events.len());
753 assert_eq!("index", dispatcher.outbox.events[0].kind());
754 assert_eq!("custom", dispatcher.outbox.events[1].kind());
755 assert_eq!(1, dispatcher.context_keys.len());
756 }
757
758 #[test]
759 fn can_process_events_successfully() {
760 let (event_sender, event_rx) = create_event_sender();
761 let events_configuration =
762 create_events_configuration(event_sender, Duration::from_secs(100));
763 let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
764
765 let dispatcher_handle = thread::Builder::new()
766 .spawn(move || {
767 let mut dispatcher = create_dispatcher(events_configuration);
768 dispatcher.start(inbox_rx)
769 })
770 .unwrap();
771
772 let context = ContextBuilder::new("context")
773 .build()
774 .expect("Failed to create context");
775 let event_factory = EventFactory::new(true);
776
777 inbox_tx
778 .send(EventDispatcherMessage::EventMessage(
779 event_factory.new_identify(context),
780 ))
781 .expect("event send failed");
782 inbox_tx
783 .send(EventDispatcherMessage::Flush)
784 .expect("flush failed");
785
786 let (tx, rx) = bounded(1);
787 inbox_tx
788 .send(EventDispatcherMessage::Close(tx))
789 .expect("failed to close");
790 rx.recv().expect("failed to notify on close");
791 dispatcher_handle.join().unwrap();
792
793 assert_eq!(event_rx.iter().count(), 1);
794 }
795
796 #[test]
797 fn dispatcher_flushes_periodically() {
798 let (event_sender, event_rx) = create_event_sender();
799 let events_configuration =
800 create_events_configuration(event_sender, Duration::from_millis(200));
801 let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
802
803 let _ = thread::Builder::new()
804 .spawn(move || {
805 let mut dispatcher = create_dispatcher(events_configuration);
806 dispatcher.start(inbox_rx)
807 })
808 .unwrap();
809
810 let context = ContextBuilder::new("context")
811 .build()
812 .expect("Failed to create context");
813 let event_factory = EventFactory::new(true);
814
815 inbox_tx
816 .send(EventDispatcherMessage::EventMessage(
817 event_factory.new_identify(context),
818 ))
819 .expect("event send failed");
820
821 thread::sleep(Duration::from_millis(300));
822 assert_eq!(event_rx.try_iter().count(), 1);
823 }
824
825 fn create_dispatcher(events_configuration: EventsConfiguration) -> EventDispatcher {
826 EventDispatcher::new(events_configuration)
827 }
828}