launchdarkly_server_sdk/events/
processor.rs

1use crossbeam_channel::{bounded, Sender};
2use std::sync::Once;
3use std::thread;
4use thiserror::Error;
5
6use super::dispatcher::{EventDispatcher, EventDispatcherMessage};
7use super::event::InputEvent;
8use super::EventsConfiguration;
9
10#[non_exhaustive]
11#[derive(Debug, Error)]
12pub enum EventProcessorError {
13    #[error(transparent)]
14    SpawnFailed(#[from] std::io::Error),
15}
16
17/// Trait for the component that buffers analytics events and sends them to LaunchDarkly.
18/// This component can be replaced for testing purposes.
19pub trait EventProcessor: Send + Sync {
20    /// Records an InputEvent asynchronously. Depending on the feature flag properties and event
21    /// properties, this may be transmitted to the events service as an individual event, or may
22    /// only be added into summary data.
23    fn send(&self, event: InputEvent);
24
25    /// Specifies that any buffered events should be sent as soon as possible, rather than waiting
26    /// for the next flush interval. This method is asynchronous, so events still may not be sent
27    /// until a later time.
28    fn flush(&self);
29
30    /// Shuts down all event processor activity, after first ensuring that all events have been
31    /// delivered. Subsequent calls to [EventProcessor::send] or [EventProcessor::flush] will be
32    /// ignored.
33    fn close(&self);
34}
35
36pub struct NullEventProcessor {}
37
38impl NullEventProcessor {
39    pub fn new() -> Self {
40        Self {}
41    }
42}
43
44impl EventProcessor for NullEventProcessor {
45    fn send(&self, _: InputEvent) {}
46    fn flush(&self) {}
47    fn close(&self) {}
48}
49
50pub struct EventProcessorImpl {
51    inbox_tx: Sender<EventDispatcherMessage>,
52    inbox_full_once: Once,
53}
54
55impl EventProcessorImpl {
56    pub fn new(events_configuration: EventsConfiguration) -> Result<Self, EventProcessorError> {
57        let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
58        let dispatch_start = move || {
59            let mut dispatcher = EventDispatcher::new(events_configuration);
60            dispatcher.start(inbox_rx)
61        };
62
63        match thread::Builder::new().spawn(dispatch_start) {
64            Ok(_) => Ok(Self {
65                inbox_tx,
66                inbox_full_once: Once::new(),
67            }),
68            Err(e) => Err(EventProcessorError::SpawnFailed(e)),
69        }
70    }
71}
72
73impl EventProcessor for EventProcessorImpl {
74    fn send(&self, event: InputEvent) {
75        if self
76            .inbox_tx
77            .try_send(EventDispatcherMessage::EventMessage(event))
78            .is_err()
79        {
80            self.inbox_full_once.call_once(|| {
81                warn!("Events are being produced faster than they can be processed; some events will be dropped")
82            });
83        }
84    }
85
86    fn flush(&self) {
87        let _ = self.inbox_tx.try_send(EventDispatcherMessage::Flush);
88    }
89
90    fn close(&self) {
91        let (sender, receiver) = bounded::<()>(1);
92
93        if self.inbox_tx.send(EventDispatcherMessage::Flush).is_err() {
94            error!("Failed to send final flush message. Cannot stop event processor");
95            return;
96        }
97
98        if self
99            .inbox_tx
100            .send(EventDispatcherMessage::Close(sender))
101            .is_err()
102        {
103            error!("Failed to send close message. Cannot stop event processor");
104            return;
105        }
106
107        let _ = receiver.recv();
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use std::time::Duration;
114
115    use launchdarkly_server_sdk_evaluation::{ContextBuilder, Detail, Flag, FlagValue, Reason};
116    use test_case::test_case;
117
118    use crate::{
119        events::{
120            create_event_sender, create_events_configuration,
121            event::{EventFactory, OutputEvent},
122        },
123        test_common::basic_flag,
124    };
125
126    use super::*;
127
128    #[test]
129    fn calling_close_on_processor_twice_returns() {
130        let (event_sender, _) = create_event_sender();
131        let events_configuration =
132            create_events_configuration(event_sender, Duration::from_secs(100));
133        let event_processor =
134            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
135        event_processor.close();
136        event_processor.close();
137    }
138
139    #[test_case(true, vec!["index", "feature", "summary"])]
140    #[test_case(false, vec!["index", "summary"])]
141    fn sending_feature_event_emits_correct_events(
142        flag_track_events: bool,
143        expected_event_types: Vec<&str>,
144    ) {
145        let mut flag = basic_flag("flag");
146        flag.track_events = flag_track_events;
147        let context = ContextBuilder::new("foo")
148            .build()
149            .expect("Failed to create context");
150        let detail = Detail {
151            value: Some(FlagValue::from(false)),
152            variation_index: Some(1),
153            reason: Reason::Fallthrough {
154                in_experiment: false,
155            },
156        };
157
158        let event_factory = EventFactory::new(true);
159        let feature_request = event_factory.new_eval_event(
160            &flag.key,
161            context,
162            &flag,
163            detail,
164            FlagValue::from(false),
165            None,
166        );
167
168        let (event_sender, event_rx) = create_event_sender();
169        let events_configuration =
170            create_events_configuration(event_sender, Duration::from_secs(100));
171        let event_processor =
172            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
173        event_processor.send(feature_request);
174        event_processor.flush();
175        event_processor.close();
176
177        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
178        assert_eq!(expected_event_types.len(), events.len());
179
180        for event_type in expected_event_types {
181            assert!(events.iter().any(|e| e.kind() == event_type));
182        }
183    }
184
185    #[test]
186    fn sending_feature_event_with_rule_track_events_emits_feature_and_summary() {
187        let flag: Flag = serde_json::from_str(
188            r#"{
189                 "key": "with_rule",
190                 "on": true,
191                 "targets": [],
192                 "prerequisites": [],
193                 "rules": [
194                   {
195                     "id": "rule-0",
196                     "clauses": [{
197                       "attribute": "key",
198                       "negate": false,
199                       "op": "matches",
200                       "values": ["do-track"]
201                     }],
202                     "trackEvents": true,
203                     "variation": 1
204                   },
205                   {
206                     "id": "rule-1",
207                     "clauses": [{
208                       "attribute": "key",
209                       "negate": false,
210                       "op": "matches",
211                       "values": ["no-track"]
212                     }],
213                     "trackEvents": false,
214                     "variation": 1
215                   }
216                 ],
217                 "fallthrough": {"variation": 0},
218                 "trackEventsFallthrough": true,
219                 "offVariation": 0,
220                 "clientSideAvailability": {
221                   "usingMobileKey": false,
222                   "usingEnvironmentId": false
223                 },
224                 "salt": "kosher",
225                 "version": 2,
226                 "variations": [false, true]
227               }"#,
228        )
229        .expect("flag should parse");
230
231        let context_track_rule = ContextBuilder::new("do-track")
232            .build()
233            .expect("Failed to create context");
234        let context_notrack_rule = ContextBuilder::new("no-track")
235            .build()
236            .expect("Failed to create context");
237        let context_fallthrough = ContextBuilder::new("foo")
238            .build()
239            .expect("Failed to create context");
240
241        let detail_track_rule = Detail {
242            value: Some(FlagValue::from(true)),
243            variation_index: Some(1),
244            reason: Reason::RuleMatch {
245                rule_index: 0,
246                rule_id: "rule-0".into(),
247                in_experiment: false,
248            },
249        };
250        let detail_notrack_rule = Detail {
251            value: Some(FlagValue::from(true)),
252            variation_index: Some(1),
253            reason: Reason::RuleMatch {
254                rule_index: 1,
255                rule_id: "rule-1".into(),
256                in_experiment: false,
257            },
258        };
259        let detail_fallthrough = Detail {
260            value: Some(FlagValue::from(false)),
261            variation_index: Some(0),
262            reason: Reason::Fallthrough {
263                in_experiment: false,
264            },
265        };
266
267        let event_factory = EventFactory::new(true);
268        let fre_track_rule = event_factory.new_eval_event(
269            &flag.key,
270            context_track_rule,
271            &flag,
272            detail_track_rule,
273            FlagValue::from(false),
274            None,
275        );
276        let fre_notrack_rule = event_factory.new_eval_event(
277            &flag.key,
278            context_notrack_rule,
279            &flag,
280            detail_notrack_rule,
281            FlagValue::from(false),
282            None,
283        );
284        let fre_fallthrough = event_factory.new_eval_event(
285            &flag.key,
286            context_fallthrough,
287            &flag,
288            detail_fallthrough,
289            FlagValue::from(false),
290            None,
291        );
292
293        let (event_sender, event_rx) = create_event_sender();
294        let events_configuration =
295            create_events_configuration(event_sender, Duration::from_secs(100));
296        let event_processor =
297            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
298
299        for fre in [fre_track_rule, fre_notrack_rule, fre_fallthrough] {
300            event_processor.send(fre);
301        }
302
303        event_processor.flush();
304        event_processor.close();
305
306        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
307
308        // detail_track_rule -> feature + index, detail_notrack_rule -> index, detail_fallthrough -> feature + index, 1 summary
309        assert_eq!(events.len(), 2 + 1 + 2 + 1);
310        assert_eq!(
311            events
312                .iter()
313                .filter(|event| event.kind() == "feature")
314                .count(),
315            2
316        );
317        assert!(events.iter().any(|e| e.kind() == "index"));
318        assert!(events.iter().any(|e| e.kind() == "summary"));
319    }
320
321    #[test]
322    fn feature_events_dedupe_index_events() {
323        let flag = basic_flag("flag");
324        let context = ContextBuilder::new("bar")
325            .build()
326            .expect("Failed to create context");
327        let detail = Detail {
328            value: Some(FlagValue::from(false)),
329            variation_index: Some(1),
330            reason: Reason::Fallthrough {
331                in_experiment: false,
332            },
333        };
334
335        let event_factory = EventFactory::new(true);
336        let feature_request = event_factory.new_eval_event(
337            &flag.key,
338            context,
339            &flag,
340            detail,
341            FlagValue::from(false),
342            None,
343        );
344
345        let (event_sender, event_rx) = create_event_sender();
346        let events_configuration =
347            create_events_configuration(event_sender, Duration::from_secs(100));
348        let event_processor =
349            EventProcessorImpl::new(events_configuration).expect("failed to start ep");
350        event_processor.send(feature_request.clone());
351        event_processor.send(feature_request);
352        event_processor.flush();
353        event_processor.close();
354
355        let events = event_rx.iter().collect::<Vec<OutputEvent>>();
356
357        assert_eq!(events.len(), 2);
358
359        assert_eq!(
360            events
361                .iter()
362                .filter(|event| event.kind() == "index")
363                .count(),
364            1
365        );
366
367        assert_eq!(
368            events
369                .iter()
370                .filter(|event| event.kind() == "summary")
371                .count(),
372            1
373        );
374    }
375}