use std::{io::Write, path::Path};
use serde::Deserialize;
use thiserror::Error;
use uuid::Uuid;
use super::{
attachment::AttachmentType,
v7::{Attachment, Event, SampleProfile, SessionAggregates, SessionUpdate, Transaction},
};
#[derive(Debug, Error)]
pub enum EnvelopeError {
#[error("unexpected end of file")]
UnexpectedEof,
#[error("missing envelope header")]
MissingHeader,
#[error("missing item header")]
MissingItemHeader,
#[error("missing newline after header or payload")]
MissingNewline,
#[error("invalid envelope header")]
InvalidHeader(#[source] serde_json::Error),
#[error("invalid item header")]
InvalidItemHeader(#[source] serde_json::Error),
#[error("invalid item payload")]
InvalidItemPayload(#[source] serde_json::Error),
}
#[derive(Deserialize)]
struct EnvelopeHeader {
event_id: Option<Uuid>,
}
#[derive(Clone, Debug, Eq, PartialEq, Deserialize)]
enum EnvelopeItemType {
#[serde(rename = "event")]
Event,
#[serde(rename = "session")]
SessionUpdate,
#[serde(rename = "sessions")]
SessionAggregates,
#[serde(rename = "transaction")]
Transaction,
#[serde(rename = "attachment")]
Attachment,
#[serde(rename = "profile")]
Profile,
}
#[derive(Clone, Debug, Deserialize)]
struct EnvelopeItemHeader {
r#type: EnvelopeItemType,
length: Option<usize>,
filename: Option<String>,
attachment_type: Option<AttachmentType>,
content_type: Option<String>,
}
#[derive(Clone, Debug, PartialEq)]
#[non_exhaustive]
#[allow(clippy::large_enum_variant)]
pub enum EnvelopeItem {
Event(Event<'static>),
SessionUpdate(SessionUpdate<'static>),
SessionAggregates(SessionAggregates<'static>),
Transaction(Transaction<'static>),
Attachment(Attachment),
Profile(SampleProfile),
}
impl From<Event<'static>> for EnvelopeItem {
fn from(event: Event<'static>) -> Self {
EnvelopeItem::Event(event)
}
}
impl From<SessionUpdate<'static>> for EnvelopeItem {
fn from(session: SessionUpdate<'static>) -> Self {
EnvelopeItem::SessionUpdate(session)
}
}
impl From<SessionAggregates<'static>> for EnvelopeItem {
fn from(aggregates: SessionAggregates<'static>) -> Self {
EnvelopeItem::SessionAggregates(aggregates)
}
}
impl From<Transaction<'static>> for EnvelopeItem {
fn from(transaction: Transaction<'static>) -> Self {
EnvelopeItem::Transaction(transaction)
}
}
impl From<Attachment> for EnvelopeItem {
fn from(attachment: Attachment) -> Self {
EnvelopeItem::Attachment(attachment)
}
}
impl From<SampleProfile> for EnvelopeItem {
fn from(profile: SampleProfile) -> Self {
EnvelopeItem::Profile(profile)
}
}
#[derive(Clone)]
pub struct EnvelopeItemIter<'s> {
inner: std::slice::Iter<'s, EnvelopeItem>,
}
impl<'s> Iterator for EnvelopeItemIter<'s> {
type Item = &'s EnvelopeItem;
fn next(&mut self) -> Option<Self::Item> {
self.inner.next()
}
}
#[derive(Clone, Default, Debug, PartialEq)]
pub struct Envelope {
event_id: Option<Uuid>,
items: Vec<EnvelopeItem>,
}
impl Envelope {
pub fn new() -> Envelope {
Default::default()
}
pub fn add_item<I>(&mut self, item: I)
where
I: Into<EnvelopeItem>,
{
let item = item.into();
if self.event_id.is_none() {
if let EnvelopeItem::Event(ref event) = item {
self.event_id = Some(event.event_id);
} else if let EnvelopeItem::Transaction(ref transaction) = item {
self.event_id = Some(transaction.event_id);
}
}
self.items.push(item);
}
pub fn items(&self) -> EnvelopeItemIter {
EnvelopeItemIter {
inner: self.items.iter(),
}
}
pub fn uuid(&self) -> Option<&Uuid> {
self.event_id.as_ref()
}
pub fn event(&self) -> Option<&Event<'static>> {
self.items
.iter()
.filter_map(|item| match item {
EnvelopeItem::Event(event) => Some(event),
_ => None,
})
.next()
}
pub fn filter<P>(self, mut predicate: P) -> Option<Self>
where
P: FnMut(&EnvelopeItem) -> bool,
{
let mut filtered = Envelope::new();
for item in self.items {
if predicate(&item) {
filtered.add_item(item);
}
}
if filtered.uuid().is_none() {
filtered
.items
.retain(|item| !matches!(item, EnvelopeItem::Attachment(..)))
}
if filtered.items.is_empty() {
None
} else {
Some(filtered)
}
}
pub fn to_writer<W>(&self, mut writer: W) -> std::io::Result<()>
where
W: Write,
{
let mut item_buf = Vec::new();
let event_id = self.uuid();
match event_id {
Some(uuid) => writeln!(writer, r#"{{"event_id":"{}"}}"#, uuid)?,
_ => writeln!(writer, "{{}}")?,
}
for item in &self.items {
match item {
EnvelopeItem::Event(event) => serde_json::to_writer(&mut item_buf, event)?,
EnvelopeItem::SessionUpdate(session) => {
serde_json::to_writer(&mut item_buf, session)?
}
EnvelopeItem::SessionAggregates(aggregates) => {
serde_json::to_writer(&mut item_buf, aggregates)?
}
EnvelopeItem::Transaction(transaction) => {
serde_json::to_writer(&mut item_buf, transaction)?
}
EnvelopeItem::Attachment(attachment) => {
attachment.to_writer(&mut writer)?;
writeln!(writer)?;
continue;
}
EnvelopeItem::Profile(profile) => serde_json::to_writer(&mut item_buf, profile)?,
}
let item_type = match item {
EnvelopeItem::Event(_) => "event",
EnvelopeItem::SessionUpdate(_) => "session",
EnvelopeItem::SessionAggregates(_) => "sessions",
EnvelopeItem::Transaction(_) => "transaction",
EnvelopeItem::Attachment(_) => unreachable!(),
EnvelopeItem::Profile(_) => "profile",
};
writeln!(
writer,
r#"{{"type":"{}","length":{}}}"#,
item_type,
item_buf.len()
)?;
writer.write_all(&item_buf)?;
writeln!(writer)?;
item_buf.clear();
}
Ok(())
}
pub fn from_slice(slice: &[u8]) -> Result<Envelope, EnvelopeError> {
let (header, offset) = Self::parse_header(slice)?;
let items = Self::parse_items(slice, offset)?;
let mut envelope = Envelope {
event_id: header.event_id,
..Default::default()
};
for item in items {
envelope.add_item(item);
}
Ok(envelope)
}
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Envelope, EnvelopeError> {
let bytes = std::fs::read(path).map_err(|_| EnvelopeError::UnexpectedEof)?;
Envelope::from_slice(&bytes)
}
fn parse_header(slice: &[u8]) -> Result<(EnvelopeHeader, usize), EnvelopeError> {
let mut stream = serde_json::Deserializer::from_slice(slice).into_iter();
let header: EnvelopeHeader = match stream.next() {
None => return Err(EnvelopeError::MissingHeader),
Some(Err(error)) => return Err(EnvelopeError::InvalidHeader(error)),
Some(Ok(header)) => header,
};
Self::require_termination(slice, stream.byte_offset())?;
Ok((header, stream.byte_offset() + 1))
}
fn parse_items(slice: &[u8], mut offset: usize) -> Result<Vec<EnvelopeItem>, EnvelopeError> {
let mut items = Vec::new();
while offset < slice.len() {
let bytes = slice
.get(offset..)
.ok_or(EnvelopeError::MissingItemHeader)?;
let (item, item_size) = Self::parse_item(bytes)?;
offset += item_size;
items.push(item);
}
Ok(items)
}
fn parse_item(slice: &[u8]) -> Result<(EnvelopeItem, usize), EnvelopeError> {
let mut stream = serde_json::Deserializer::from_slice(slice).into_iter();
let header: EnvelopeItemHeader = match stream.next() {
None => return Err(EnvelopeError::UnexpectedEof),
Some(Err(error)) => return Err(EnvelopeError::InvalidItemHeader(error)),
Some(Ok(header)) => header,
};
let header_end = stream.byte_offset();
Self::require_termination(slice, header_end)?;
let payload_start = std::cmp::min(header_end + 1, slice.len());
let payload_end = match header.length {
Some(len) => {
let payload_end = payload_start + len;
if slice.len() < payload_end {
return Err(EnvelopeError::UnexpectedEof);
}
Self::require_termination(slice, payload_end)?;
payload_end
}
None => match slice.get(payload_start..) {
Some(range) => match range.iter().position(|&b| b == b'\n') {
Some(relative_end) => payload_start + relative_end,
None => slice.len(),
},
None => slice.len(),
},
};
let payload = slice.get(payload_start..payload_end).unwrap();
let item = match header.r#type {
EnvelopeItemType::Event => serde_json::from_slice(payload).map(EnvelopeItem::Event),
EnvelopeItemType::Transaction => {
serde_json::from_slice(payload).map(EnvelopeItem::Transaction)
}
EnvelopeItemType::SessionUpdate => {
serde_json::from_slice(payload).map(EnvelopeItem::SessionUpdate)
}
EnvelopeItemType::SessionAggregates => {
serde_json::from_slice(payload).map(EnvelopeItem::SessionAggregates)
}
EnvelopeItemType::Attachment => Ok(EnvelopeItem::Attachment(Attachment {
buffer: payload.to_owned(),
filename: header.filename.unwrap_or_default(),
content_type: header.content_type,
ty: header.attachment_type,
})),
EnvelopeItemType::Profile => serde_json::from_slice(payload).map(EnvelopeItem::Profile),
}
.map_err(EnvelopeError::InvalidItemPayload)?;
Ok((item, payload_end + 1))
}
fn require_termination(slice: &[u8], offset: usize) -> Result<(), EnvelopeError> {
match slice.get(offset) {
Some(&b'\n') | None => Ok(()),
Some(_) => Err(EnvelopeError::MissingNewline),
}
}
}
impl From<Event<'static>> for Envelope {
fn from(event: Event<'static>) -> Self {
let mut envelope = Self::default();
envelope.add_item(event);
envelope
}
}
impl From<Transaction<'static>> for Envelope {
fn from(transaction: Transaction<'static>) -> Self {
let mut envelope = Self::default();
envelope.add_item(transaction);
envelope
}
}
#[cfg(test)]
mod test {
use std::str::FromStr;
use std::time::{Duration, SystemTime};
use time::format_description::well_known::Rfc3339;
use time::OffsetDateTime;
use super::*;
use crate::protocol::v7::{Level, SessionAttributes, SessionStatus, Span};
fn to_str(envelope: Envelope) -> String {
let mut vec = Vec::new();
envelope.to_writer(&mut vec).unwrap();
String::from_utf8_lossy(&vec).to_string()
}
fn timestamp(s: &str) -> SystemTime {
let dt = OffsetDateTime::parse(s, &Rfc3339).unwrap();
let secs = dt.unix_timestamp() as u64;
let nanos = dt.nanosecond();
let duration = Duration::new(secs, nanos);
SystemTime::UNIX_EPOCH.checked_add(duration).unwrap()
}
#[test]
fn test_empty() {
assert_eq!(to_str(Envelope::new()), "{}\n");
}
#[test]
fn test_event() {
let event_id = Uuid::parse_str("22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c").unwrap();
let timestamp = timestamp("2020-07-20T14:51:14.296Z");
let event = Event {
event_id,
timestamp,
..Default::default()
};
let envelope: Envelope = event.into();
assert_eq!(
to_str(envelope),
r#"{"event_id":"22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c"}
{"type":"event","length":74}
{"event_id":"22d00b3fd1b14b5d8d2049d138cd8a9c","timestamp":1595256674.296}
"#
)
}
#[test]
fn test_session() {
let session_id = Uuid::parse_str("22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c").unwrap();
let started = timestamp("2020-07-20T14:51:14.296Z");
let session = SessionUpdate {
session_id,
distinct_id: Some("foo@bar.baz".to_owned()),
sequence: None,
timestamp: None,
started,
init: true,
duration: Some(1.234),
status: SessionStatus::Ok,
errors: 123,
attributes: SessionAttributes {
release: "foo-bar@1.2.3".into(),
environment: Some("production".into()),
ip_address: None,
user_agent: None,
},
};
let mut envelope = Envelope::new();
envelope.add_item(session);
assert_eq!(
to_str(envelope),
r#"{}
{"type":"session","length":222}
{"sid":"22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c","did":"foo@bar.baz","started":"2020-07-20T14:51:14.296Z","init":true,"duration":1.234,"status":"ok","errors":123,"attrs":{"release":"foo-bar@1.2.3","environment":"production"}}
"#
)
}
#[test]
fn test_transaction() {
let event_id = Uuid::parse_str("22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c").unwrap();
let span_id = "d42cee9fc3e74f5c".parse().unwrap();
let trace_id = "335e53d614474acc9f89e632b776cc28".parse().unwrap();
let start_timestamp = timestamp("2020-07-20T14:51:14.296Z");
let spans = vec![Span {
span_id,
trace_id,
start_timestamp,
..Default::default()
}];
let transaction = Transaction {
event_id,
start_timestamp,
spans,
..Default::default()
};
let envelope: Envelope = transaction.into();
assert_eq!(
to_str(envelope),
r#"{"event_id":"22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c"}
{"type":"transaction","length":200}
{"event_id":"22d00b3fd1b14b5d8d2049d138cd8a9c","start_timestamp":1595256674.296,"spans":[{"span_id":"d42cee9fc3e74f5c","trace_id":"335e53d614474acc9f89e632b776cc28","start_timestamp":1595256674.296}]}
"#
)
}
#[test]
fn test_event_with_attachment() {
let event_id = Uuid::parse_str("22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c").unwrap();
let timestamp = timestamp("2020-07-20T14:51:14.296Z");
let event = Event {
event_id,
timestamp,
..Default::default()
};
let mut envelope: Envelope = event.into();
envelope.add_item(Attachment {
buffer: "some content".as_bytes().to_vec(),
filename: "file.txt".to_string(),
..Default::default()
});
assert_eq!(
to_str(envelope),
r#"{"event_id":"22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c"}
{"type":"event","length":74}
{"event_id":"22d00b3fd1b14b5d8d2049d138cd8a9c","timestamp":1595256674.296}
{"type":"attachment","length":12,"filename":"file.txt","attachment_type":"event.attachment","content_type":"application/octet-stream"}
some content
"#
)
}
#[test]
fn test_deserialize_envelope_empty() {
let bytes = b"{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\"}";
let envelope = Envelope::from_slice(bytes).unwrap();
let event_id = Uuid::from_str("9ec79c33ec9942ab8353589fcb2e04dc").unwrap();
assert_eq!(envelope.event_id, Some(event_id));
assert_eq!(envelope.items().count(), 0);
}
#[test]
fn test_deserialize_envelope_empty_newline() {
let bytes = b"{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\"}\n";
let envelope = Envelope::from_slice(bytes).unwrap();
assert_eq!(envelope.items().count(), 0);
}
#[test]
fn test_deserialize_envelope_empty_item_newline() {
let bytes = b"\
{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\"}\n\
{\"type\":\"attachment\",\"length\":0}\n\
\n\
{\"type\":\"attachment\",\"length\":0}\n\
";
let envelope = Envelope::from_slice(bytes).unwrap();
assert_eq!(envelope.items().count(), 2);
let mut items = envelope.items();
if let EnvelopeItem::Attachment(attachment) = items.next().unwrap() {
assert_eq!(attachment.buffer.len(), 0);
} else {
panic!("invalid item type");
}
if let EnvelopeItem::Attachment(attachment) = items.next().unwrap() {
assert_eq!(attachment.buffer.len(), 0);
} else {
panic!("invalid item type");
}
}
#[test]
fn test_deserialize_envelope_empty_item_eof() {
let bytes = b"\
{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\"}\n\
{\"type\":\"attachment\",\"length\":0}\n\
\n\
{\"type\":\"attachment\",\"length\":0}\
";
let envelope = Envelope::from_slice(bytes).unwrap();
assert_eq!(envelope.items().count(), 2);
let mut items = envelope.items();
if let EnvelopeItem::Attachment(attachment) = items.next().unwrap() {
assert_eq!(attachment.buffer.len(), 0);
} else {
panic!("invalid item type");
}
if let EnvelopeItem::Attachment(attachment) = items.next().unwrap() {
assert_eq!(attachment.buffer.len(), 0);
} else {
panic!("invalid item type");
}
}
#[test]
fn test_deserialize_envelope_implicit_length() {
let bytes = b"\
{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\"}\n\
{\"type\":\"attachment\"}\n\
helloworld\n\
";
let envelope = Envelope::from_slice(bytes).unwrap();
assert_eq!(envelope.items().count(), 1);
let mut items = envelope.items();
if let EnvelopeItem::Attachment(attachment) = items.next().unwrap() {
assert_eq!(attachment.buffer.len(), 10);
} else {
panic!("invalid item type");
}
}
#[test]
fn test_deserialize_envelope_implicit_length_eof() {
let bytes = b"\
{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\"}\n\
{\"type\":\"attachment\"}\n\
helloworld\
";
let envelope = Envelope::from_slice(bytes).unwrap();
assert_eq!(envelope.items().count(), 1);
let mut items = envelope.items();
if let EnvelopeItem::Attachment(attachment) = items.next().unwrap() {
assert_eq!(attachment.buffer.len(), 10);
} else {
panic!("invalid item type");
}
}
#[test]
fn test_deserialize_envelope_implicit_length_empty_eof() {
let bytes = b"\
{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\"}\n\
{\"type\":\"attachment\"}\
";
let envelope = Envelope::from_slice(bytes).unwrap();
assert_eq!(envelope.items().count(), 1);
let mut items = envelope.items();
if let EnvelopeItem::Attachment(attachment) = items.next().unwrap() {
assert_eq!(attachment.buffer.len(), 0);
} else {
panic!("invalid item type");
}
}
#[test]
fn test_deserialize_envelope_multiple_items() {
let bytes = b"\
{\"event_id\":\"9ec79c33ec9942ab8353589fcb2e04dc\"}\n\
{\"type\":\"attachment\",\"length\":10,\"content_type\":\"text/plain\",\"filename\":\"hello.txt\"}\n\
\xef\xbb\xbfHello\r\n\n\
{\"type\":\"event\",\"length\":41,\"content_type\":\"application/json\",\"filename\":\"application.log\"}\n\
{\"message\":\"hello world\",\"level\":\"error\"}\n\
";
let envelope = Envelope::from_slice(bytes).unwrap();
assert_eq!(envelope.items().count(), 2);
let mut items = envelope.items();
if let EnvelopeItem::Attachment(attachment) = items.next().unwrap() {
assert_eq!(attachment.buffer.len(), 10);
assert_eq!(attachment.buffer, b"\xef\xbb\xbfHello\r\n");
assert_eq!(attachment.filename, "hello.txt");
assert_eq!(attachment.content_type, Some("text/plain".to_string()));
} else {
panic!("invalid item type");
}
if let EnvelopeItem::Event(event) = items.next().unwrap() {
assert_eq!(event.message, Some("hello world".to_string()));
assert_eq!(event.level, Level::Error);
} else {
panic!("invalid item type");
}
}
#[test]
fn test_deserialize_serialized() {
let event = Event {
event_id: Uuid::parse_str("22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c").unwrap(),
timestamp: timestamp("2020-07-20T14:51:14.296Z"),
..Default::default()
};
let transaction = Transaction {
event_id: Uuid::parse_str("22d00b3f-d1b1-4b5d-8d20-49d138cd8a9d").unwrap(),
start_timestamp: timestamp("2020-07-20T14:51:14.296Z"),
spans: vec![Span {
span_id: "d42cee9fc3e74f5c".parse().unwrap(),
trace_id: "335e53d614474acc9f89e632b776cc28".parse().unwrap(),
start_timestamp: timestamp("2020-07-20T14:51:14.296Z"),
..Default::default()
}],
..Default::default()
};
let session = SessionUpdate {
session_id: Uuid::parse_str("22d00b3f-d1b1-4b5d-8d20-49d138cd8a9c").unwrap(),
distinct_id: Some("foo@bar.baz".to_owned()),
sequence: None,
timestamp: None,
started: timestamp("2020-07-20T14:51:14.296Z"),
init: true,
duration: Some(1.234),
status: SessionStatus::Ok,
errors: 123,
attributes: SessionAttributes {
release: "foo-bar@1.2.3".into(),
environment: Some("production".into()),
ip_address: None,
user_agent: None,
},
};
let attachment = Attachment {
buffer: "some content".as_bytes().to_vec(),
filename: "file.txt".to_string(),
..Default::default()
};
let mut envelope: Envelope = Envelope::new();
envelope.add_item(event);
envelope.add_item(transaction);
envelope.add_item(session);
envelope.add_item(attachment);
let serialized = to_str(envelope);
let deserialized = Envelope::from_slice(serialized.as_bytes()).unwrap();
assert_eq!(serialized, to_str(deserialized))
}
}