launchdarkly_server_sdk/events/
processor.rs
1use crossbeam_channel::{bounded, Sender};
2use std::sync::Once;
3use std::thread;
4use thiserror::Error;
5
6use super::dispatcher::{EventDispatcher, EventDispatcherMessage};
7use super::event::InputEvent;
8use super::EventsConfiguration;
9
10#[non_exhaustive]
11#[derive(Debug, Error)]
12pub enum EventProcessorError {
13 #[error(transparent)]
14 SpawnFailed(#[from] std::io::Error),
15}
16
17pub trait EventProcessor: Send + Sync {
20 fn send(&self, event: InputEvent);
24
25 fn flush(&self);
29
30 fn close(&self);
34}
35
36pub struct NullEventProcessor {}
37
38impl NullEventProcessor {
39 pub fn new() -> Self {
40 Self {}
41 }
42}
43
44impl EventProcessor for NullEventProcessor {
45 fn send(&self, _: InputEvent) {}
46 fn flush(&self) {}
47 fn close(&self) {}
48}
49
50pub struct EventProcessorImpl {
51 inbox_tx: Sender<EventDispatcherMessage>,
52 inbox_full_once: Once,
53}
54
55impl EventProcessorImpl {
56 pub fn new(events_configuration: EventsConfiguration) -> Result<Self, EventProcessorError> {
57 let (inbox_tx, inbox_rx) = bounded(events_configuration.capacity);
58 let dispatch_start = move || {
59 let mut dispatcher = EventDispatcher::new(events_configuration);
60 dispatcher.start(inbox_rx)
61 };
62
63 match thread::Builder::new().spawn(dispatch_start) {
64 Ok(_) => Ok(Self {
65 inbox_tx,
66 inbox_full_once: Once::new(),
67 }),
68 Err(e) => Err(EventProcessorError::SpawnFailed(e)),
69 }
70 }
71}
72
73impl EventProcessor for EventProcessorImpl {
74 fn send(&self, event: InputEvent) {
75 if self
76 .inbox_tx
77 .try_send(EventDispatcherMessage::EventMessage(event))
78 .is_err()
79 {
80 self.inbox_full_once.call_once(|| {
81 warn!("Events are being produced faster than they can be processed; some events will be dropped")
82 });
83 }
84 }
85
86 fn flush(&self) {
87 let _ = self.inbox_tx.try_send(EventDispatcherMessage::Flush);
88 }
89
90 fn close(&self) {
91 let (sender, receiver) = bounded::<()>(1);
92
93 if self.inbox_tx.send(EventDispatcherMessage::Flush).is_err() {
94 error!("Failed to send final flush message. Cannot stop event processor");
95 return;
96 }
97
98 if self
99 .inbox_tx
100 .send(EventDispatcherMessage::Close(sender))
101 .is_err()
102 {
103 error!("Failed to send close message. Cannot stop event processor");
104 return;
105 }
106
107 let _ = receiver.recv();
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use std::time::Duration;
114
115 use launchdarkly_server_sdk_evaluation::{ContextBuilder, Detail, Flag, FlagValue, Reason};
116 use test_case::test_case;
117
118 use crate::{
119 events::{
120 create_event_sender, create_events_configuration,
121 event::{EventFactory, OutputEvent},
122 },
123 test_common::basic_flag,
124 };
125
126 use super::*;
127
128 #[test]
129 fn calling_close_on_processor_twice_returns() {
130 let (event_sender, _) = create_event_sender();
131 let events_configuration =
132 create_events_configuration(event_sender, Duration::from_secs(100));
133 let event_processor =
134 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
135 event_processor.close();
136 event_processor.close();
137 }
138
139 #[test_case(true, vec!["index", "feature", "summary"])]
140 #[test_case(false, vec!["index", "summary"])]
141 fn sending_feature_event_emits_correct_events(
142 flag_track_events: bool,
143 expected_event_types: Vec<&str>,
144 ) {
145 let mut flag = basic_flag("flag");
146 flag.track_events = flag_track_events;
147 let context = ContextBuilder::new("foo")
148 .build()
149 .expect("Failed to create context");
150 let detail = Detail {
151 value: Some(FlagValue::from(false)),
152 variation_index: Some(1),
153 reason: Reason::Fallthrough {
154 in_experiment: false,
155 },
156 };
157
158 let event_factory = EventFactory::new(true);
159 let feature_request = event_factory.new_eval_event(
160 &flag.key,
161 context,
162 &flag,
163 detail,
164 FlagValue::from(false),
165 None,
166 );
167
168 let (event_sender, event_rx) = create_event_sender();
169 let events_configuration =
170 create_events_configuration(event_sender, Duration::from_secs(100));
171 let event_processor =
172 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
173 event_processor.send(feature_request);
174 event_processor.flush();
175 event_processor.close();
176
177 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
178 assert_eq!(expected_event_types.len(), events.len());
179
180 for event_type in expected_event_types {
181 assert!(events.iter().any(|e| e.kind() == event_type));
182 }
183 }
184
185 #[test]
186 fn sending_feature_event_with_rule_track_events_emits_feature_and_summary() {
187 let flag: Flag = serde_json::from_str(
188 r#"{
189 "key": "with_rule",
190 "on": true,
191 "targets": [],
192 "prerequisites": [],
193 "rules": [
194 {
195 "id": "rule-0",
196 "clauses": [{
197 "attribute": "key",
198 "negate": false,
199 "op": "matches",
200 "values": ["do-track"]
201 }],
202 "trackEvents": true,
203 "variation": 1
204 },
205 {
206 "id": "rule-1",
207 "clauses": [{
208 "attribute": "key",
209 "negate": false,
210 "op": "matches",
211 "values": ["no-track"]
212 }],
213 "trackEvents": false,
214 "variation": 1
215 }
216 ],
217 "fallthrough": {"variation": 0},
218 "trackEventsFallthrough": true,
219 "offVariation": 0,
220 "clientSideAvailability": {
221 "usingMobileKey": false,
222 "usingEnvironmentId": false
223 },
224 "salt": "kosher",
225 "version": 2,
226 "variations": [false, true]
227 }"#,
228 )
229 .expect("flag should parse");
230
231 let context_track_rule = ContextBuilder::new("do-track")
232 .build()
233 .expect("Failed to create context");
234 let context_notrack_rule = ContextBuilder::new("no-track")
235 .build()
236 .expect("Failed to create context");
237 let context_fallthrough = ContextBuilder::new("foo")
238 .build()
239 .expect("Failed to create context");
240
241 let detail_track_rule = Detail {
242 value: Some(FlagValue::from(true)),
243 variation_index: Some(1),
244 reason: Reason::RuleMatch {
245 rule_index: 0,
246 rule_id: "rule-0".into(),
247 in_experiment: false,
248 },
249 };
250 let detail_notrack_rule = Detail {
251 value: Some(FlagValue::from(true)),
252 variation_index: Some(1),
253 reason: Reason::RuleMatch {
254 rule_index: 1,
255 rule_id: "rule-1".into(),
256 in_experiment: false,
257 },
258 };
259 let detail_fallthrough = Detail {
260 value: Some(FlagValue::from(false)),
261 variation_index: Some(0),
262 reason: Reason::Fallthrough {
263 in_experiment: false,
264 },
265 };
266
267 let event_factory = EventFactory::new(true);
268 let fre_track_rule = event_factory.new_eval_event(
269 &flag.key,
270 context_track_rule,
271 &flag,
272 detail_track_rule,
273 FlagValue::from(false),
274 None,
275 );
276 let fre_notrack_rule = event_factory.new_eval_event(
277 &flag.key,
278 context_notrack_rule,
279 &flag,
280 detail_notrack_rule,
281 FlagValue::from(false),
282 None,
283 );
284 let fre_fallthrough = event_factory.new_eval_event(
285 &flag.key,
286 context_fallthrough,
287 &flag,
288 detail_fallthrough,
289 FlagValue::from(false),
290 None,
291 );
292
293 let (event_sender, event_rx) = create_event_sender();
294 let events_configuration =
295 create_events_configuration(event_sender, Duration::from_secs(100));
296 let event_processor =
297 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
298
299 for fre in [fre_track_rule, fre_notrack_rule, fre_fallthrough] {
300 event_processor.send(fre);
301 }
302
303 event_processor.flush();
304 event_processor.close();
305
306 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
307
308 assert_eq!(events.len(), 2 + 1 + 2 + 1);
310 assert_eq!(
311 events
312 .iter()
313 .filter(|event| event.kind() == "feature")
314 .count(),
315 2
316 );
317 assert!(events.iter().any(|e| e.kind() == "index"));
318 assert!(events.iter().any(|e| e.kind() == "summary"));
319 }
320
321 #[test]
322 fn feature_events_dedupe_index_events() {
323 let flag = basic_flag("flag");
324 let context = ContextBuilder::new("bar")
325 .build()
326 .expect("Failed to create context");
327 let detail = Detail {
328 value: Some(FlagValue::from(false)),
329 variation_index: Some(1),
330 reason: Reason::Fallthrough {
331 in_experiment: false,
332 },
333 };
334
335 let event_factory = EventFactory::new(true);
336 let feature_request = event_factory.new_eval_event(
337 &flag.key,
338 context,
339 &flag,
340 detail,
341 FlagValue::from(false),
342 None,
343 );
344
345 let (event_sender, event_rx) = create_event_sender();
346 let events_configuration =
347 create_events_configuration(event_sender, Duration::from_secs(100));
348 let event_processor =
349 EventProcessorImpl::new(events_configuration).expect("failed to start ep");
350 event_processor.send(feature_request.clone());
351 event_processor.send(feature_request);
352 event_processor.flush();
353 event_processor.close();
354
355 let events = event_rx.iter().collect::<Vec<OutputEvent>>();
356
357 assert_eq!(events.len(), 2);
358
359 assert_eq!(
360 events
361 .iter()
362 .filter(|event| event.kind() == "index")
363 .count(),
364 1
365 );
366
367 assert_eq!(
368 events
369 .iter()
370 .filter(|event| event.kind() == "summary")
371 .count(),
372 1
373 );
374 }
375}