1use std::{collections::VecDeque, convert::TryFrom, str::from_utf8};
2
3use hyper::body::Bytes;
4use log::{debug, log_enabled, trace};
5use pin_project::pin_project;
6
7use crate::response::Response;
8
9use super::error::{Error, Result};
10
11#[derive(Default, PartialEq)]
12struct EventData {
13 pub event_type: String,
14 pub data: String,
15 pub id: Option<String>,
16 pub retry: Option<u64>,
17}
18
19impl EventData {
20 fn new() -> Self {
21 Self::default()
22 }
23
24 pub fn append_data(&mut self, value: &str) {
25 self.data.push_str(value);
26 self.data.push('\n');
27 }
28
29 pub fn with_id(mut self, value: Option<String>) -> Self {
30 self.id = value;
31 self
32 }
33}
34
35#[derive(Debug, Eq, PartialEq)]
36pub enum SSE {
37 Connected(ConnectionDetails),
38 Event(Event),
39 Comment(String),
40}
41
42impl TryFrom<EventData> for Option<SSE> {
43 type Error = Error;
44
45 fn try_from(event_data: EventData) -> std::result::Result<Self, Self::Error> {
46 if event_data == EventData::default() {
47 return Err(Error::InvalidEvent);
48 }
49
50 if event_data.data.is_empty() {
51 return Ok(None);
52 }
53
54 let event_type = if event_data.event_type.is_empty() {
55 String::from("message")
56 } else {
57 event_data.event_type
58 };
59
60 let mut data = event_data.data.clone();
61 data.truncate(data.len() - 1);
62
63 let id = event_data.id.clone();
64
65 let retry = event_data.retry;
66
67 Ok(Some(SSE::Event(Event {
68 event_type,
69 data,
70 id,
71 retry,
72 })))
73 }
74}
75
76#[derive(Clone, Debug, Eq, PartialEq)]
77pub struct ConnectionDetails {
78 response: Response,
79}
80
81impl ConnectionDetails {
82 pub(crate) fn new(response: Response) -> Self {
83 Self { response }
84 }
85
86 pub fn response(&self) -> &Response {
88 &self.response
89 }
90}
91
92#[derive(Clone, Debug, Eq, PartialEq)]
93pub struct Event {
94 pub event_type: String,
95 pub data: String,
96 pub id: Option<String>,
97 pub retry: Option<u64>,
98}
99
100const LOGIFY_MAX_CHARS: usize = 100;
101fn logify(bytes: &[u8]) -> String {
102 let stringified = from_utf8(bytes).unwrap_or("<bad utf8>");
103 stringified.chars().take(LOGIFY_MAX_CHARS).collect()
104}
105
106fn parse_field(line: &[u8]) -> Result<Option<(&str, &str)>> {
107 if line.is_empty() {
108 return Err(Error::InvalidLine(
109 "should never try to parse an empty line (probably a bug)".into(),
110 ));
111 }
112
113 match line.iter().position(|&b| b':' == b) {
114 Some(0) => {
115 let value = &line[1..];
116 debug!("comment: {}", logify(value));
117 Ok(Some(("comment", parse_value(value)?)))
118 }
119 Some(colon_pos) => {
120 let key = &line[0..colon_pos];
121 let key = parse_key(key)?;
122
123 let mut value = &line[colon_pos + 1..];
124 if value.starts_with(b" ") {
126 value = &value[1..];
127 }
128
129 debug!("key: {}, value: {}", key, logify(value));
130
131 Ok(Some((key, parse_value(value)?)))
132 }
133 None => Ok(Some((parse_key(line)?, ""))),
134 }
135}
136
137fn parse_key(key: &[u8]) -> Result<&str> {
138 from_utf8(key).map_err(|e| Error::InvalidLine(format!("malformed key: {e:?}")))
139}
140
141fn parse_value(value: &[u8]) -> Result<&str> {
142 from_utf8(value).map_err(|e| Error::InvalidLine(format!("malformed value: {e:?}")))
143}
144
145#[pin_project]
146#[must_use = "streams do nothing unless polled"]
147pub struct EventParser {
148 complete_lines: VecDeque<Vec<u8>>,
151 incomplete_line: Option<Vec<u8>>,
154 last_char_was_cr: bool,
157 event_data: Option<EventData>,
159 last_event_id: Option<String>,
161 sse: VecDeque<SSE>,
162}
163
164impl EventParser {
165 pub fn new() -> Self {
166 Self {
167 complete_lines: VecDeque::with_capacity(10),
168 incomplete_line: None,
169 last_char_was_cr: false,
170 event_data: None,
171 last_event_id: None,
172 sse: VecDeque::with_capacity(3),
173 }
174 }
175
176 pub fn was_processing(&self) -> bool {
177 if self.incomplete_line.is_some() || !self.complete_lines.is_empty() {
178 true
179 } else {
180 !self.sse.is_empty()
181 }
182 }
183
184 pub fn get_event(&mut self) -> Option<SSE> {
185 self.sse.pop_front()
186 }
187
188 pub fn process_bytes(&mut self, bytes: Bytes) -> Result<()> {
189 trace!("Parsing bytes {:?}", bytes);
190 self.decode_and_buffer_lines(bytes);
201 self.parse_complete_lines_into_event()?;
202
203 Ok(())
204 }
205
206 fn parse_complete_lines_into_event(&mut self) -> Result<()> {
212 loop {
213 let mut seen_empty_line = false;
214
215 while let Some(line) = self.complete_lines.pop_front() {
216 if line.is_empty() && self.event_data.is_some() {
217 seen_empty_line = true;
218 break;
219 } else if line.is_empty() {
220 continue;
221 }
222
223 if let Some((key, value)) = parse_field(&line)? {
224 if key == "comment" {
225 self.sse.push_back(SSE::Comment(value.to_string()));
226 continue;
227 }
228
229 let id = &self.last_event_id;
230 let event_data = self
231 .event_data
232 .get_or_insert_with(|| EventData::new().with_id(id.clone()));
233
234 if key == "event" {
235 event_data.event_type = value.to_string()
236 } else if key == "data" {
237 event_data.append_data(value);
238 } else if key == "id" {
239 if value.chars().any(|c| c == '\0') {
242 debug!("Ignoring event ID containing null byte");
243 continue;
244 }
245
246 if value.is_empty() {
247 self.last_event_id = Some("".to_string());
248 } else {
249 self.last_event_id = Some(value.to_string());
250 }
251
252 event_data.id.clone_from(&self.last_event_id)
253 } else if key == "retry" {
254 match value.parse::<u64>() {
255 Ok(retry) => {
256 event_data.retry = Some(retry);
257 }
258 _ => debug!("Failed to parse {:?} into retry value", value),
259 };
260 }
261 }
262 }
263
264 if seen_empty_line {
265 let event_data = self.event_data.take();
266
267 trace!(
268 "seen empty line, event_data is {:?})",
269 event_data.as_ref().map(|event_data| &event_data.event_type)
270 );
271
272 if let Some(event_data) = event_data {
273 match Option::<SSE>::try_from(event_data) {
274 Err(e) => return Err(e),
275 Ok(None) => (),
276 Ok(Some(event)) => self.sse.push_back(event),
277 };
278 }
279
280 continue;
281 } else {
282 trace!("processed all complete lines but event_data not yet complete");
283 }
284
285 break;
286 }
287
288 Ok(())
289 }
290
291 fn decode_and_buffer_lines(&mut self, chunk: Bytes) {
294 let mut lines = chunk.split_inclusive(|&b| b == b'\n' || b == b'\r');
295 if let Some(incomplete_line) = self.incomplete_line.as_mut() {
302 if let Some(line) = lines.next() {
303 trace!(
304 "extending line from previous chunk: {:?}+{:?}",
305 logify(incomplete_line),
306 logify(line)
307 );
308
309 self.last_char_was_cr = false;
310 if !line.is_empty() {
311 match line.last().unwrap() {
314 b'\r' => {
315 incomplete_line.extend_from_slice(&line[..line.len() - 1]);
316 let il = self.incomplete_line.take();
317 self.complete_lines.push_back(il.unwrap());
318 self.last_char_was_cr = true;
319 }
320 b'\n' => {
321 incomplete_line.extend_from_slice(&line[..line.len() - 1]);
322 let il = self.incomplete_line.take();
323 self.complete_lines.push_back(il.unwrap());
324 }
325 _ => incomplete_line.extend_from_slice(line),
326 };
327 }
328 }
329 }
330
331 let mut lines = lines.peekable();
332 while let Some(line) = lines.next() {
333 if let Some(actually_complete_line) = self.incomplete_line.take() {
334 trace!(
336 "previous line was complete: {:?}",
337 logify(&actually_complete_line)
338 );
339 self.complete_lines.push_back(actually_complete_line);
340 }
341
342 if self.last_char_was_cr && line == [b'\n'] {
343 self.last_char_was_cr = false;
346 continue;
347 }
348
349 self.last_char_was_cr = false;
350 if line.ends_with(b"\r") {
351 self.complete_lines
352 .push_back(line[..line.len() - 1].to_vec());
353 self.last_char_was_cr = true;
354 } else if line.ends_with(b"\n") {
355 self.complete_lines
357 .push_back(line[..line.len() - 1].to_vec());
358 } else if line.is_empty() {
359 trace!("chunk ended with a line terminator");
361 } else if lines.peek().is_some() {
362 self.complete_lines.push_back(line.to_vec());
365 } else {
366 trace!("buffering incomplete line: {:?}", logify(line));
368 self.incomplete_line = Some(line.to_vec());
369 }
370 }
371
372 if log_enabled!(log::Level::Trace) {
373 for line in &self.complete_lines {
374 trace!("complete line: {:?}", logify(line));
375 }
376 if let Some(line) = &self.incomplete_line {
377 trace!("incomplete line: {:?}", logify(line));
378 }
379 }
380 }
381}
382
383#[cfg(test)]
384mod tests {
385 use std::str::FromStr;
386
387 use super::{Error::*, *};
388 use proptest::proptest;
389 use test_case::test_case;
390
391 fn field<'a>(key: &'a str, value: &'a str) -> Result<Option<(&'a str, &'a str)>> {
392 Ok(Some((key, value)))
393 }
394
395 fn require_pop_event<F>(parser: &mut EventParser, f: F)
398 where
399 F: FnOnce(Event),
400 {
401 if let Some(SSE::Event(event)) = parser.get_event() {
402 f(event)
403 } else {
404 panic!("Event should have been received")
405 }
406 }
407
408 #[test]
409 fn test_logify_handles_code_point_boundaries() {
410 let phase = String::from_str(
411 "这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。",
412 )
413 .expect("Invalid sample string");
414
415 let input: &[u8] = phase.as_bytes();
416 let result = logify(input);
417
418 assert!(result == "这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如此。这是一条很长的消息,最初导致我们的代码出现恐慌。我希望情况不再如");
419 }
420
421 #[test]
422 fn test_parse_field_invalid() {
423 assert!(parse_field(b"").is_err());
424
425 match parse_field(b"\x80: invalid UTF-8") {
426 Err(InvalidLine(msg)) => assert!(msg.contains("Utf8Error")),
427 res => panic!("expected InvalidLine error, got {:?}", res),
428 }
429 }
430
431 #[test]
432 fn test_event_id_error_if_invalid_utf8() {
433 let mut bytes = Vec::from("id: ");
434 let mut invalid = vec![b'\xf0', b'\x28', b'\x8c', b'\xbc'];
435 bytes.append(&mut invalid);
436 bytes.push(b'\n');
437 let mut parser = EventParser::new();
438 assert!(parser.process_bytes(Bytes::from(bytes)).is_err());
439 }
440
441 #[test]
442 fn test_parse_field_comments() {
443 assert_eq!(parse_field(b":"), field("comment", ""));
444 assert_eq!(
445 parse_field(b":hello \0 world"),
446 field("comment", "hello \0 world")
447 );
448 assert_eq!(parse_field(b":event: foo"), field("comment", "event: foo"));
449 }
450
451 #[test]
452 fn test_parse_field_valid() {
453 assert_eq!(parse_field(b"event:foo"), field("event", "foo"));
454 assert_eq!(parse_field(b"event: foo"), field("event", "foo"));
455 assert_eq!(parse_field(b"event: foo"), field("event", " foo"));
456 assert_eq!(parse_field(b"event:\tfoo"), field("event", "\tfoo"));
457 assert_eq!(parse_field(b"event: foo "), field("event", "foo "));
458
459 assert_eq!(parse_field(b"disconnect:"), field("disconnect", ""));
460 assert_eq!(parse_field(b"disconnect: "), field("disconnect", ""));
461 assert_eq!(parse_field(b"disconnect: "), field("disconnect", " "));
462 assert_eq!(parse_field(b"disconnect:\t"), field("disconnect", "\t"));
463
464 assert_eq!(parse_field(b"disconnect"), field("disconnect", ""));
465
466 assert_eq!(parse_field(b" : foo"), field(" ", "foo"));
467 assert_eq!(parse_field(b"\xe2\x98\x83: foo"), field("☃", "foo"));
468 }
469
470 fn event(typ: &str, data: &str) -> SSE {
471 SSE::Event(Event {
472 data: data.to_string(),
473 id: None,
474 event_type: typ.to_string(),
475 retry: None,
476 })
477 }
478
479 fn event_with_id(typ: &str, data: &str, id: &str) -> SSE {
480 SSE::Event(Event {
481 data: data.to_string(),
482 id: Some(id.to_string()),
483 event_type: typ.to_string(),
484 retry: None,
485 })
486 }
487
488 #[test]
489 fn test_event_without_data_yields_no_event() {
490 let mut parser = EventParser::new();
491 assert!(parser.process_bytes(Bytes::from("id: abc\n\n")).is_ok());
492 assert!(parser.get_event().is_none());
493 }
494
495 #[test]
496 fn test_ignore_id_containing_null() {
497 let mut parser = EventParser::new();
498 assert!(parser
499 .process_bytes(Bytes::from("id: a\x00bc\nevent: add\ndata: abc\n\n"))
500 .is_ok());
501
502 if let Some(SSE::Event(event)) = parser.get_event() {
503 assert!(event.id.is_none());
504 } else {
505 panic!("Event should have been received");
506 }
507 }
508
509 #[test_case("event: add\ndata: hello\n\n", "add".into())]
510 #[test_case("data: hello\n\n", "message".into())]
511 fn test_event_can_parse_type_correctly(chunk: &'static str, event_type: String) {
512 let mut parser = EventParser::new();
513
514 assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
515
516 require_pop_event(&mut parser, |e| assert_eq!(event_type, e.event_type));
517 }
518
519 #[test_case("data: hello\n\n", event("message", "hello"); "parses event body with LF")]
520 #[test_case("data: hello\n\r", event("message", "hello"); "parses event body with LF and trailing CR")]
521 #[test_case("data: hello\r\n\n", event("message", "hello"); "parses event body with CRLF")]
522 #[test_case("data: hello\r\n\r", event("message", "hello"); "parses event body with CRLF and trailing CR")]
523 #[test_case("data: hello\r\r", event("message", "hello"); "parses event body with CR")]
524 #[test_case("data: hello\r\r\n", event("message", "hello"); "parses event body with CR and trailing CRLF")]
525 #[test_case("id: 1\ndata: hello\n\n", event_with_id("message", "hello", "1"))]
526 #[test_case("id: 😀\ndata: hello\n\n", event_with_id("message", "hello", "😀"))]
527 fn test_decode_chunks_simple(chunk: &'static str, event: SSE) {
528 let mut parser = EventParser::new();
529 assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
530 assert_eq!(parser.get_event().unwrap(), event);
531 assert!(parser.get_event().is_none());
532 }
533
534 #[test_case("persistent-event-id.sse"; "persistent-event-id.sse")]
535 fn test_last_id_persists_if_not_overridden(file: &str) {
536 let contents = read_contents_from_file(file);
537 let mut parser = EventParser::new();
538 assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
539
540 require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("1".into())));
541 require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("1".into())));
542 require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("3".into())));
543 require_pop_event(&mut parser, |e| assert_eq!(e.id, Some("3".into())));
544 }
545
546 #[test_case(b":hello\n"; "with LF")]
547 #[test_case(b":hello\r"; "with CR")]
548 #[test_case(b":hello\r\n"; "with CRLF")]
549 fn test_decode_chunks_comments_are_generated(chunk: &'static [u8]) {
550 let mut parser = EventParser::new();
551 assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
552 assert!(parser.get_event().is_some());
553 }
554
555 #[test]
556 fn test_comment_is_separate_from_event() {
557 let mut parser = EventParser::new();
558 let result = parser.process_bytes(Bytes::from(":comment\ndata:hello\n\n"));
559 assert!(result.is_ok());
560
561 let comment = parser.get_event();
562 assert!(matches!(comment, Some(SSE::Comment(_))));
563
564 let event = parser.get_event();
565 assert!(matches!(event, Some(SSE::Event(_))));
566
567 assert!(parser.get_event().is_none());
568 }
569
570 #[test]
571 fn test_comment_with_trailing_blank_line() {
572 let mut parser = EventParser::new();
573 let result = parser.process_bytes(Bytes::from(":comment\n\r\n\r"));
574 assert!(result.is_ok());
575
576 let comment = parser.get_event();
577 assert!(matches!(comment, Some(SSE::Comment(_))));
578
579 assert!(parser.get_event().is_none());
580 }
581
582 #[test_case(&["data:", "hello\n\n"], event("message", "hello"); "data split")]
583 #[test_case(&["data:hell", "o\n\n"], event("message", "hello"); "data truncated")]
584 fn test_decode_message_split_across_chunks(chunks: &[&'static str], event: SSE) {
585 let mut parser = EventParser::new();
586
587 if let Some((last, chunks)) = chunks.split_last() {
588 for chunk in chunks {
589 assert!(parser.process_bytes(Bytes::from(*chunk)).is_ok());
590 assert!(parser.get_event().is_none());
591 }
592
593 assert!(parser.process_bytes(Bytes::from(*last)).is_ok());
594 assert_eq!(parser.get_event(), Some(event));
595 assert!(parser.get_event().is_none());
596 } else {
597 panic!("Failed to split last");
598 }
599 }
600
601 #[test_case(&["data:hell", "o\n\ndata:", "world\n\n"], &[event("message", "hello"), event("message", "world")]; "with lf")]
602 #[test_case(&["data:hell", "o\r\rdata:", "world\r\r"], &[event("message", "hello"), event("message", "world")]; "with cr")]
603 #[test_case(&["data:hell", "o\r\n\ndata:", "world\r\n\n"], &[event("message", "hello"), event("message", "world")]; "with crlf")]
604 fn test_decode_multiple_messages_split_across_chunks(chunks: &[&'static str], events: &[SSE]) {
605 let mut parser = EventParser::new();
606
607 for chunk in chunks {
608 assert!(parser.process_bytes(Bytes::from(*chunk)).is_ok());
609 }
610
611 for event in events {
612 assert_eq!(parser.get_event().unwrap(), *event);
613 }
614
615 assert!(parser.get_event().is_none());
616 }
617
618 #[test]
619 fn test_decode_line_split_across_chunks() {
620 let mut parser = EventParser::new();
621 assert!(parser.process_bytes(Bytes::from("data:foo")).is_ok());
622 assert!(parser.process_bytes(Bytes::from("")).is_ok());
623 assert!(parser.process_bytes(Bytes::from("baz\n\n")).is_ok());
624 assert_eq!(parser.get_event(), Some(event("message", "foobaz")));
625 assert!(parser.get_event().is_none());
626
627 assert!(parser.process_bytes(Bytes::from("data:foo")).is_ok());
628 assert!(parser.process_bytes(Bytes::from("bar")).is_ok());
629 assert!(parser.process_bytes(Bytes::from("baz\n\n")).is_ok());
630 assert_eq!(parser.get_event(), Some(event("message", "foobarbaz")));
631 assert!(parser.get_event().is_none());
632 }
633
634 #[test]
635 fn test_decode_concatenates_multiple_values_for_same_field() {
636 let mut parser = EventParser::new();
637 assert!(parser.process_bytes(Bytes::from("data:hello\n")).is_ok());
638 assert!(parser.process_bytes(Bytes::from("data:world\n\n")).is_ok());
639 assert_eq!(parser.get_event(), Some(event("message", "hello\nworld")));
640 assert!(parser.get_event().is_none());
641 }
642
643 #[test_case("\n\n\n\n" ; "all LFs")]
644 #[test_case("\r\r\r\r" ; "all CRs")]
645 #[test_case("\r\n\r\n\r\n\r\n" ; "all CRLFs")]
646 fn test_decode_repeated_terminators(chunk: &'static str) {
647 let mut parser = EventParser::new();
648 assert!(parser.process_bytes(Bytes::from(chunk)).is_ok());
649
650 assert!(parser.get_event().is_none());
653 }
654
655 #[test]
656 fn test_decode_extra_terminators_between_events() {
657 let mut parser = EventParser::new();
658 assert!(parser
659 .process_bytes(Bytes::from("data: abc\n\n\ndata: def\n\n"))
660 .is_ok());
661
662 assert_eq!(parser.get_event(), Some(event("message", "abc")));
663 assert_eq!(parser.get_event(), Some(event("message", "def")));
664 assert!(parser.get_event().is_none());
665 }
666
667 #[test_case("one-event.sse"; "one-event.sse")]
668 #[test_case("one-event-crlf.sse"; "one-event-crlf.sse")]
669 fn test_decode_one_event(file: &str) {
670 let contents = read_contents_from_file(file);
671 let mut parser = EventParser::new();
672 assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
673
674 require_pop_event(&mut parser, |e| {
675 assert_eq!(e.event_type, "patch");
676 assert!(e
677 .data
678 .contains(r#"path":"/flags/goals.02.featureWithGoals"#));
679 });
680 }
681
682 #[test_case("two-events.sse"; "two-events.sse")]
683 #[test_case("two-events-crlf.sse"; "two-events-crlf.sse")]
684 fn test_decode_two_events(file: &str) {
685 let contents = read_contents_from_file(file);
686 let mut parser = EventParser::new();
687 assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
688
689 require_pop_event(&mut parser, |e| {
690 assert_eq!(e.event_type, "one");
691 assert_eq!(e.data, "One");
692 });
693
694 require_pop_event(&mut parser, |e| {
695 assert_eq!(e.event_type, "two");
696 assert_eq!(e.data, "Two");
697 });
698 }
699
700 #[test_case("big-event-followed-by-another.sse"; "big-event-followed-by-another.sse")]
701 #[test_case("big-event-followed-by-another-crlf.sse"; "big-event-followed-by-another-crlf.sse")]
702 fn test_decode_big_event_followed_by_another(file: &str) {
703 let contents = read_contents_from_file(file);
704 let mut parser = EventParser::new();
705 assert!(parser.process_bytes(Bytes::from(contents)).is_ok());
706
707 require_pop_event(&mut parser, |e| {
708 assert_eq!(e.event_type, "patch");
709 assert!(e.data.len() > 10_000);
710 assert!(e.data.contains(r#"path":"/flags/big.00.bigFeatureKey"#));
711 });
712
713 require_pop_event(&mut parser, |e| {
714 assert_eq!(e.event_type, "patch");
715 assert!(e
716 .data
717 .contains(r#"path":"/flags/goals.02.featureWithGoals"#));
718 });
719 }
720
721 fn read_contents_from_file(name: &str) -> Vec<u8> {
722 std::fs::read(format!("test-data/{}", name))
723 .unwrap_or_else(|_| panic!("couldn't read {}", name))
724 }
725
726 proptest! {
727 #[test]
728 fn test_decode_and_buffer_lines_does_not_crash(next in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)", previous in "(\r\n|\r|\n)*event: [^\n\r:]*(\r\n|\r|\n)") {
729 let mut parser = EventParser::new();
730 parser.incomplete_line = Some(previous.as_bytes().to_vec());
731 parser.decode_and_buffer_lines(Bytes::from(next));
732 }
733 }
734}