eventsource_client/
event_parser.rs

1use std::{collections::VecDeque, convert::TryFrom, str::from_utf8};
2
3use hyper::body::Bytes;
4use log::{debug, log_enabled, trace};
5use pin_project::pin_project;
6
7use crate::response::Response;
8
9use super::error::{Error, Result};
10
11#[derive(Default, PartialEq)]
12struct EventData {
13    pub event_type: String,
14    pub data: String,
15    pub id: Option<String>,
16    pub retry: Option<u64>,
17}
18
19impl EventData {
20    fn new() -> Self {
21        Self::default()
22    }
23
24    pub fn append_data(&mut self, value: &str) {
25        self.data.push_str(value);
26        self.data.push('\n');
27    }
28
29    pub fn with_id(mut self, value: Option<String>) -> Self {
30        self.id = value;
31        self
32    }
33}
34
35#[derive(Debug, Eq, PartialEq)]
36pub enum SSE {
37    Connected(ConnectionDetails),
38    Event(Event),
39    Comment(String),
40}
41
42impl TryFrom<EventData> for Option<SSE> {
43    type Error = Error;
44
45    fn try_from(event_data: EventData) -> std::result::Result<Self, Self::Error> {
46        if event_data == EventData::default() {
47            return Err(Error::InvalidEvent);
48        }
49
50        if event_data.data.is_empty() {
51            return Ok(None);
52        }
53
54        let event_type = if event_data.event_type.is_empty() {
55            String::from("message")
56        } else {
57            event_data.event_type
58        };
59
60        let mut data = event_data.data.clone();
61        data.truncate(data.len() - 1);
62
63        let id = event_data.id.clone();
64
65        let retry = event_data.retry;
66
67        Ok(Some(SSE::Event(Event {
68            event_type,
69            data,
70            id,
71            retry,
72        })))
73    }
74}
75
76#[derive(Clone, Debug, Eq, PartialEq)]
77pub struct ConnectionDetails {
78    response: Response,
79}
80
81impl ConnectionDetails {
82    pub(crate) fn new(response: Response) -> Self {
83        Self { response }
84    }
85
86    /// Returns information describing the response at the time of connection.
87    pub fn response(&self) -> &Response {
88        &self.response
89    }
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
93pub struct Event {
94    pub event_type: String,
95    pub data: String,
96    pub id: Option<String>,
97    pub retry: Option<u64>,
98}
99
100const LOGIFY_MAX_CHARS: usize = 100;
101fn logify(bytes: &[u8]) -> String {
102    let stringified = from_utf8(bytes).unwrap_or("<bad utf8>");
103    stringified.chars().take(LOGIFY_MAX_CHARS).collect()
104}
105
106fn parse_field(line: &[u8]) -> Result<Option<(&str, &str)>> {
107    if line.is_empty() {
108        return Err(Error::InvalidLine(
109            "should never try to parse an empty line (probably a bug)".into(),
110        ));
111    }
112
113    match line.iter().position(|&b| b':' == b) {
114        Some(0) => {
115            let value = &line[1..];
116            debug!("comment: {}", logify(value));
117            Ok(Some(("comment", parse_value(value)?)))
118        }
119        Some(colon_pos) => {
120            let key = &line[0..colon_pos];
121            let key = parse_key(key)?;
122
123            let mut value = &line[colon_pos + 1..];
124            // remove the first initial space character if any (but remove no other whitespace)
125            if value.starts_with(b" ") {
126                value = &value[1..];
127            }
128
129            debug!("key: {}, value: {}", key, logify(value));
130
131            Ok(Some((key, parse_value(value)?)))
132        }
133        None => Ok(Some((parse_key(line)?, ""))),
134    }
135}
136
137fn parse_key(key: &[u8]) -> Result<&str> {
138    from_utf8(key).map_err(|e| Error::InvalidLine(format!("malformed key: {e:?}")))
139}
140
141fn parse_value(value: &[u8]) -> Result<&str> {
142    from_utf8(value).map_err(|e| Error::InvalidLine(format!("malformed value: {e:?}")))
143}
144
145#[pin_project]
146#[must_use = "streams do nothing unless polled"]
147pub struct EventParser {
148    /// buffer for lines we know are complete (terminated) but not yet parsed into event fields, in
149    /// the order received
150    complete_lines: VecDeque<Vec<u8>>,
151    /// buffer for the most-recently received line, pending completion (by a newline terminator) or
152    /// extension (by more non-newline bytes)
153    incomplete_line: Option<Vec<u8>>,
154    /// flagged if the last character processed as a carriage return; used to help process CRLF
155    /// pairs
156    last_char_was_cr: bool,
157    /// the event data currently being decoded
158    event_data: Option<EventData>,
159    /// the last-seen event ID; events without an ID will take on this value until it is updated.
160    last_event_id: Option<String>,
161    sse: VecDeque<SSE>,
162}
163
164impl EventParser {
165    pub fn new() -> Self {
166        Self {
167            complete_lines: VecDeque::with_capacity(10),
168            incomplete_line: None,
169            last_char_was_cr: false,
170            event_data: None,
171            last_event_id: None,
172            sse: VecDeque::with_capacity(3),
173        }
174    }
175
176    pub fn was_processing(&self) -> bool {
177        if self.incomplete_line.is_some() || !self.complete_lines.is_empty() {
178            true
179        } else {
180            !self.sse.is_empty()
181        }
182    }
183
184    pub fn get_event(&mut self) -> Option<SSE> {
185        self.sse.pop_front()
186    }
187
188    pub fn process_bytes(&mut self, bytes: Bytes) -> Result<()> {
189        trace!("Parsing bytes {:?}", bytes);
190        // We get bytes from the underlying stream in chunks.  Decoding a chunk has two phases:
191        // decode the chunk into lines, and decode the lines into events.
192        //
193        // We counterintuitively do these two phases in reverse order. Because both lines and
194        // events may be split across chunks, we need to ensure we have a complete
195        // (newline-terminated) line before parsing it, and a complete event
196        // (empty-line-terminated) before returning it. So we buffer lines between poll()
197        // invocations, and begin by processing any incomplete events from previous invocations,
198        // before requesting new input from the underlying stream and processing that.
199
200        self.decode_and_buffer_lines(bytes);
201        self.parse_complete_lines_into_event()?;
202
203        Ok(())
204    }
205
206    // Populate the event fields from the complete lines already seen, until we either encounter an
207    // empty line - indicating we've decoded a complete event - or we run out of complete lines to
208    // process.
209    //
210    // Returns the event for dispatch if it is complete.
211    fn parse_complete_lines_into_event(&mut self) -> Result<()> {
212        loop {
213            let mut seen_empty_line = false;
214
215            while let Some(line) = self.complete_lines.pop_front() {
216                if line.is_empty() && self.event_data.is_some() {
217                    seen_empty_line = true;
218                    break;
219                } else if line.is_empty() {
220                    continue;
221                }
222
223                if let Some((key, value)) = parse_field(&line)? {
224                    if key == "comment" {
225                        self.sse.push_back(SSE::Comment(value.to_string()));
226                        continue;
227                    }
228
229                    let id = &self.last_event_id;
230                    let event_data = self
231                        .event_data
232                        .get_or_insert_with(|| EventData::new().with_id(id.clone()));
233
234                    if key == "event" {
235                        event_data.event_type = value.to_string()
236                    } else if key == "data" {
237                        event_data.append_data(value);
238                    } else if key == "id" {
239                        // If id contains a null byte, it is a non-fatal error and the rest of
240                        // the event should be parsed if possible.
241                        if value.chars().any(|c| c == '\0') {
242                            debug!("Ignoring event ID containing null byte");
243                            continue;
244                        }
245
246                        if value.is_empty() {
247                            self.last_event_id = Some("".to_string());
248                        } else {
249                            self.last_event_id = Some(value.to_string());
250                        }
251
252                        event_data.id.clone_from(&self.last_event_id)
253                    } else if key == "retry" {
254                        match value.parse::<u64>() {
255                            Ok(retry) => {
256                                event_data.retry = Some(retry);
257                            }
258                            _ => debug!("Failed to parse {:?} into retry value", value),
259                        };
260                    }
261                }
262            }
263
264            if seen_empty_line {
265                let event_data = self.event_data.take();
266
267                trace!(
268                    "seen empty line, event_data is {:?})",
269                    event_data.as_ref().map(|event_data| &event_data.event_type)
270                );
271
272                if let Some(event_data) = event_data {
273                    match Option::<SSE>::try_from(event_data) {
274                        Err(e) => return Err(e),
275                        Ok(None) => (),
276                        Ok(Some(event)) => self.sse.push_back(event),
277                    };
278                }
279
280                continue;
281            } else {
282                trace!("processed all complete lines but event_data not yet complete");
283            }
284
285            break;
286        }
287
288        Ok(())
289    }
290
291    // Decode a chunk into lines and buffer them for subsequent parsing, taking account of
292    // incomplete lines from previous chunks.
293    fn decode_and_buffer_lines(&mut self, chunk: Bytes) {
294        let mut lines = chunk.split_inclusive(|&b| b == b'\n' || b == b'\r');
295        // The first and last elements in this split are special. The spec requires lines to be
296        // terminated. But lines may span chunks, so:
297        //  * the last line, if non-empty (i.e. if chunk didn't end with a line terminator),
298        //    should be buffered as an incomplete line
299        //  * the first line should be appended to the incomplete line, if any
300
301        if let Some(incomplete_line) = self.incomplete_line.as_mut() {
302            if let Some(line) = lines.next() {
303                trace!(
304                    "extending line from previous chunk: {:?}+{:?}",
305                    logify(incomplete_line),
306                    logify(line)
307                );
308
309                self.last_char_was_cr = false;
310                if !line.is_empty() {
311                    // Checking the last character handles lines where the last character is a
312                    // terminator, but also where the entire line is a terminator.
313                    match line.last().unwrap() {
314                        b'\r' => {
315                            incomplete_line.extend_from_slice(&line[..line.len() - 1]);
316                            let il = self.incomplete_line.take();
317                            self.complete_lines.push_back(il.unwrap());
318                            self.last_char_was_cr = true;
319                        }
320                        b'\n' => {
321                            incomplete_line.extend_from_slice(&line[..line.len() - 1]);
322                            let il = self.incomplete_line.take();
323                            self.complete_lines.push_back(il.unwrap());
324                        }
325                        _ => incomplete_line.extend_from_slice(line),
326                    };
327                }
328            }
329        }
330
331        let mut lines = lines.peekable();
332        while let Some(line) = lines.next() {
333            if let Some(actually_complete_line) = self.incomplete_line.take() {
334                // we saw the next line, so the previous one must have been complete after all
335                trace!(
336                    "previous line was complete: {:?}",
337                    logify(&actually_complete_line)
338                );
339                self.complete_lines.push_back(actually_complete_line);
340            }
341
342            if self.last_char_was_cr && line == [b'\n'] {
343                // This is a continuation of a \r\n pair, so we can ignore this line. We do need to
344                // reset our flag though.
345                self.last_char_was_cr = false;
346                continue;
347            }
348
349            self.last_char_was_cr = false;
350            if line.ends_with(b"\r") {
351                self.complete_lines
352                    .push_back(line[..line.len() - 1].to_vec());
353                self.last_char_was_cr = true;
354            } else if line.ends_with(b"\n") {
355                // self isn't a continuation, but rather a line ending with a LF terminator.
356                self.complete_lines
357                    .push_back(line[..line.len() - 1].to_vec());
358            } else if line.is_empty() {
359                // this is the last line and it's empty, no need to buffer it
360                trace!("chunk ended with a line terminator");
361            } else if lines.peek().is_some() {
362                // this line isn't the last and we know from previous checks it doesn't end in a
363                // terminator, so we can consider it complete
364                self.complete_lines.push_back(line.to_vec());
365            } else {
366                // last line needs to be buffered as it may be incomplete
367                trace!("buffering incomplete line: {:?}", logify(line));
368                self.incomplete_line = Some(line.to_vec());
369            }
370        }
371
372        if log_enabled!(log::Level::Trace) {
373            for line in &self.complete_lines {
374                trace!("complete line: {:?}", logify(line));
375            }
376            if let Some(line) = &self.incomplete_line {
377                trace!("incomplete line: {:?}", logify(line));
378            }
379        }
380    }
381}
382
383#[cfg(test)]
384mod tests {
385    use std::str::FromStr;
386
387    use super::{Error::*, *};
388    use proptest::proptest;
389    use test_case::test_case;
390
391    fn field<'a>(key: &'a str, value: &'a str) -> Result<Option<(&'a str, &'a str)>> {
392        Ok(Some((key, value)))
393    }
394
395    /// Requires an event to be popped from the given parser.
396    /// Event properties can be asserted using a closure.
397    fn require_pop_event<F>(parser: &mut EventParser, f: F)
398    where
399        F: FnOnce(Event),
400    {
401        if let Some(SSE::Event(event)) = parser.get_event() {
402            f(event)
403        } else {
404            panic!("Event should have been received")
405        }
406    }
407
408    #[test]
409    fn test_logify_handles_code_point_boundaries() {
410        let phase = String::from_str(
411            "这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。",
412        )
413        .expect("Invalid sample string");
414
415        let input: &[u8] = phase.as_bytes();
416        let result = logify(input);
417
418        assert!(result == "这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如");
419    }
420
421    #[test]
422    fn test_parse_field_invalid() {
423        assert!(parse_field(b"").is_err());
424
425        match parse_field(b"\x80: invalid UTF-8") {
426            Err(InvalidLine(msg)) => assert!(msg.contains("Utf8Error")),
427            res => panic!("expected InvalidLine error, got {:?}", res),
428        }
429    }
430
431    #[test]
432    fn test_event_id_error_if_invalid_utf8() {
433        let mut bytes = Vec::from("id: ");
434        let mut invalid = vec![b'\xf0', b'\x28', b'\x8c', b'\xbc'];
435        bytes.append(&mut invalid);
436        bytes.push(b'\n');
437        let mut parser = EventParser::new();
438        assert!(parser.process_bytes(Bytes::from(bytes)).is_err());
439    }
440
441    #[test]
442    fn test_parse_field_comments() {
443        assert_eq!(parse_field(b":"), field("comment", ""));
444        assert_eq!(
445            parse_field(b":hello \0 world"),
446            field("comment", "hello \0 world")
447        );
448        assert_eq!(parse_field(b":event: foo"), field("comment", "event: foo"));
449    }
450
451    #[test]
452    fn test_parse_field_valid() {
453        assert_eq!(parse_field(b"event:foo"), field("event", "foo"));
454        assert_eq!(parse_field(b"event: foo"), field("event", "foo"));
455        assert_eq!(parse_field(b"event:  foo"), field("event", " foo"));
456        assert_eq!(parse_field(b"event:\tfoo"), field("event", "\tfoo"));
457        assert_eq!(parse_field(b"event: foo "), field("event", "foo "));
458
459        assert_eq!(parse_field(b"disconnect:"), field("disconnect", ""));
460        assert_eq!(parse_field(b"disconnect: "), field("disconnect", ""));
461        assert_eq!(parse_field(b"disconnect:  "), field("disconnect", " "));
462        assert_eq!(parse_field(b"disconnect:\t"), field("disconnect", "\t"));
463
464        assert_eq!(parse_field(b"disconnect"), field("disconnect", ""));
465
466        assert_eq!(parse_field(b" : foo"), field(" ", "foo"));
467        assert_eq!(parse_field(b"\xe2\x98\x83: foo"), field("☃", "foo"));
468    }
469
470    fn event(typ: &str, data: &str) -> SSE {
471        SSE::Event(Event {
472            data: data.to_string(),
473            id: None,
474            event_type: typ.to_string(),
475            retry: None,
476        })
477    }
478
479    fn event_with_id(typ: &str, data: &str, id: &str) -> SSE {
480        SSE::Event(Event {
481            data: data.to_string(),
482            id: Some(id.to_string()),
483            event_type: typ.to_string(),
484            retry: None,
485        })
486    }
487
488    #[test]
489    fn test_event_without_data_yields_no_event() {
490        let mut parser = EventParser::new();
491        assert!(parser.process_bytes(Bytes::from("id: abc\n\n")).is_ok());
492        assert!(parser.get_event().is_none());
493    }
494
495    #[test]
496    fn test_ignore_id_containing_null() {
497        let mut parser = EventParser::new();
498        assert!(parser
499            .process_bytes(Bytes::from("id: a\x00bc\nevent: add\ndata: abc\n\n"))
500            .is_ok());
501
502        if let Some(SSE::Event(event)) = parser.get_event() {
503            assert!(event.id.is_none());
504        } else {
505            panic!("Event should have been received");
506        }
507    }
508
509    #[test_case("event: add\ndata: hello\n\n", "add".into())]
510    #[test_case("data: hello\n\n", "message".into())]
511    fn test_event_can_parse_type_correctly(chunk: &'static str, event_type: String) {
512        let mut parser = EventParser::new();
513
514        assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
515
516        require_pop_event(&mut parser, |e| assert_eq!(event_type, e.event_type));
517    }
518
519    #[test_case("data: hello\n\n", event("message", "hello"); "parses event body with LF")]
520    #[test_case("data: hello\n\r", event("message", "hello"); "parses event body with LF and trailing CR")]
521    #[test_case("data: hello\r\n\n", event("message", "hello"); "parses event body with CRLF")]
522    #[test_case("data: hello\r\n\r", event("message", "hello"); "parses event body with CRLF and trailing CR")]
523    #[test_case("data: hello\r\r", event("message", "hello"); "parses event body with CR")]
524    #[test_case("data: hello\r\r\n", event("message", "hello"); "parses event body with CR and trailing CRLF")]
525    #[test_case("id: 1\ndata: hello\n\n", event_with_id("message", "hello", "1"))]
526    #[test_case("id: 😀\ndata: hello\n\n", event_with_id("message", "hello", "😀"))]
527    fn test_decode_chunks_simple(chunk: &'static str, event: SSE) {
528        let mut parser = EventParser::new();
529        assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
530        assert_eq!(parser.get_event().unwrap(), event);
531        assert!(parser.get_event().is_none());
532    }
533
534    #[test_case("persistent-event-id.sse"; "persistent-event-id.sse")]
535    fn test_last_id_persists_if_not_overridden(file: &str) {
536        let contents = read_contents_from_file(file);
537        let mut parser = EventParser::new();
538        assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
539
540        require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("1".into())));
541        require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("1".into())));
542        require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("3".into())));
543        require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("3".into())));
544    }
545
546    #[test_case(b":hello\n"; "with LF")]
547    #[test_case(b":hello\r"; "with CR")]
548    #[test_case(b":hello\r\n"; "with CRLF")]
549    fn test_decode_chunks_comments_are_generated(chunk: &'static [u8]) {
550        let mut parser = EventParser::new();
551        assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
552        assert!(parser.get_event().is_some());
553    }
554
555    #[test]
556    fn test_comment_is_separate_from_event() {
557        let mut parser = EventParser::new();
558        let result = parser.process_bytes(Bytes::from(":comment\ndata:hello\n\n"));
559        assert!(result.is_ok());
560
561        let comment = parser.get_event();
562        assert!(matches!(comment, Some(SSE::Comment(_))));
563
564        let event = parser.get_event();
565        assert!(matches!(event, Some(SSE::Event(_))));
566
567        assert!(parser.get_event().is_none());
568    }
569
570    #[test]
571    fn test_comment_with_trailing_blank_line() {
572        let mut parser = EventParser::new();
573        let result = parser.process_bytes(Bytes::from(":comment\n\r\n\r"));
574        assert!(result.is_ok());
575
576        let comment = parser.get_event();
577        assert!(matches!(comment, Some(SSE::Comment(_))));
578
579        assert!(parser.get_event().is_none());
580    }
581
582    #[test_case(&["data:", "hello\n\n"], event("message", "hello"); "data split")]
583    #[test_case(&["data:hell", "o\n\n"], event("message", "hello"); "data truncated")]
584    fn test_decode_message_split_across_chunks(chunks: &[&'static str], event: SSE) {
585        let mut parser = EventParser::new();
586
587        if let Some((last, chunks)) = chunks.split_last() {
588            for chunk in chunks {
589                assert!(parser.process_bytes(Bytes::from(*chunk)).is_ok());
590                assert!(parser.get_event().is_none());
591            }
592
593            assert!(parser.process_bytes(Bytes::from(*last)).is_ok());
594            assert_eq!(parser.get_event(), Some(event));
595            assert!(parser.get_event().is_none());
596        } else {
597            panic!("Failed to split last");
598        }
599    }
600
601    #[test_case(&["data:hell", "o\n\ndata:", "world\n\n"], &[event("message", "hello"), event("message", "world")]; "with lf")]
602    #[test_case(&["data:hell", "o\r\rdata:", "world\r\r"], &[event("message", "hello"), event("message", "world")]; "with cr")]
603    #[test_case(&["data:hell", "o\r\n\ndata:", "world\r\n\n"], &[event("message", "hello"), event("message", "world")]; "with crlf")]
604    fn test_decode_multiple_messages_split_across_chunks(chunks: &[&'static str], events: &[SSE]) {
605        let mut parser = EventParser::new();
606
607        for chunk in chunks {
608            assert!(parser.process_bytes(Bytes::from(*chunk)).is_ok());
609        }
610
611        for event in events {
612            assert_eq!(parser.get_event().unwrap(), *event);
613        }
614
615        assert!(parser.get_event().is_none());
616    }
617
618    #[test]
619    fn test_decode_line_split_across_chunks() {
620        let mut parser = EventParser::new();
621        assert!(parser.process_bytes(Bytes::from("data:foo")).is_ok());
622        assert!(parser.process_bytes(Bytes::from("")).is_ok());
623        assert!(parser.process_bytes(Bytes::from("baz\n\n")).is_ok());
624        assert_eq!(parser.get_event(), Some(event("message", "foobaz")));
625        assert!(parser.get_event().is_none());
626
627        assert!(parser.process_bytes(Bytes::from("data:foo")).is_ok());
628        assert!(parser.process_bytes(Bytes::from("bar")).is_ok());
629        assert!(parser.process_bytes(Bytes::from("baz\n\n")).is_ok());
630        assert_eq!(parser.get_event(), Some(event("message", "foobarbaz")));
631        assert!(parser.get_event().is_none());
632    }
633
634    #[test]
635    fn test_decode_concatenates_multiple_values_for_same_field() {
636        let mut parser = EventParser::new();
637        assert!(parser.process_bytes(Bytes::from("data:hello\n")).is_ok());
638        assert!(parser.process_bytes(Bytes::from("data:world\n\n")).is_ok());
639        assert_eq!(parser.get_event(), Some(event("message", "hello\nworld")));
640        assert!(parser.get_event().is_none());
641    }
642
643    #[test_case("\n\n\n\n" ; "all LFs")]
644    #[test_case("\r\r\r\r" ; "all CRs")]
645    #[test_case("\r\n\r\n\r\n\r\n" ; "all CRLFs")]
646    fn test_decode_repeated_terminators(chunk: &'static str) {
647        let mut parser = EventParser::new();
648        assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
649
650        // spec seems unclear on whether this should actually dispatch empty events, but that seems
651        // unhelpful for all practical purposes
652        assert!(parser.get_event().is_none());
653    }
654
655    #[test]
656    fn test_decode_extra_terminators_between_events() {
657        let mut parser = EventParser::new();
658        assert!(parser
659            .process_bytes(Bytes::from("data: abc\n\n\ndata: def\n\n"))
660            .is_ok());
661
662        assert_eq!(parser.get_event(), Some(event("message", "abc")));
663        assert_eq!(parser.get_event(), Some(event("message", "def")));
664        assert!(parser.get_event().is_none());
665    }
666
667    #[test_case("one-event.sse"; "one-event.sse")]
668    #[test_case("one-event-crlf.sse"; "one-event-crlf.sse")]
669    fn test_decode_one_event(file: &str) {
670        let contents = read_contents_from_file(file);
671        let mut parser = EventParser::new();
672        assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
673
674        require_pop_event(&mut parser, |e| {
675            assert_eq!(e.event_type, "patch");
676            assert!(e
677                .data
678                .contains(r#"path":"/flags/goals.02.featureWithGoals"#));
679        });
680    }
681
682    #[test_case("two-events.sse"; "two-events.sse")]
683    #[test_case("two-events-crlf.sse"; "two-events-crlf.sse")]
684    fn test_decode_two_events(file: &str) {
685        let contents = read_contents_from_file(file);
686        let mut parser = EventParser::new();
687        assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
688
689        require_pop_event(&mut parser, |e| {
690            assert_eq!(e.event_type, "one");
691            assert_eq!(e.data, "One");
692        });
693
694        require_pop_event(&mut parser, |e| {
695            assert_eq!(e.event_type, "two");
696            assert_eq!(e.data, "Two");
697        });
698    }
699
700    #[test_case("big-event-followed-by-another.sse"; "big-event-followed-by-another.sse")]
701    #[test_case("big-event-followed-by-another-crlf.sse"; "big-event-followed-by-another-crlf.sse")]
702    fn test_decode_big_event_followed_by_another(file: &str) {
703        let contents = read_contents_from_file(file);
704        let mut parser = EventParser::new();
705        assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
706
707        require_pop_event(&mut parser, |e| {
708            assert_eq!(e.event_type, "patch");
709            assert!(e.data.len() > 10_000);
710            assert!(e.data.contains(r#"path":"/flags/big.00.bigFeatureKey"#));
711        });
712
713        require_pop_event(&mut parser, |e| {
714            assert_eq!(e.event_type, "patch");
715            assert!(e
716                .data
717                .contains(r#"path":"/flags/goals.02.featureWithGoals"#));
718        });
719    }
720
721    fn read_contents_from_file(name: &str) -> Vec<u8> {
722        std::fs::read(format!("test-data/{}", name))
723            .unwrap_or_else(|_| panic!("couldn't read {}", name))
724    }
725
726    proptest! {
727        #[test]
728        fn test_decode_and_buffer_lines_does_not_crash(next in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)", previous in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)") {
729            let mut parser = EventParser::new();
730            parser.incomplete_line = Some(previous.as_bytes().to_vec());
731            parser.decode_and_buffer_lines(Bytes::from(next));
732        }
733    }
734}