sentry_core/
session.rs

1//! Release Health Sessions
2//!
3//! <https://develop.sentry.dev/sdk/sessions/>
4
5#[cfg(feature = "release-health")]
6pub use session_impl::*;
7
8#[cfg(feature = "release-health")]
9mod session_impl {
10
11    use std::collections::HashMap;
12    use std::sync::{Arc, Condvar, Mutex, MutexGuard};
13    use std::thread::JoinHandle;
14    use std::time::{Duration, Instant, SystemTime};
15
16    use crate::client::TransportArc;
17    use crate::clientoptions::SessionMode;
18    use crate::protocol::{
19        EnvelopeItem, Event, Level, SessionAggregateItem, SessionAggregates, SessionAttributes,
20        SessionStatus, SessionUpdate,
21    };
22
23    #[cfg(feature = "release-health")]
24    use crate::scope::StackLayer;
25
26    #[cfg(feature = "release-health")]
27    use crate::types::random_uuid;
28    use crate::{Client, Envelope};
29
30    #[derive(Clone, Debug)]
31    pub struct Session {
32        client: Arc<Client>,
33        session_update: SessionUpdate<'static>,
34        started: Instant,
35        dirty: bool,
36    }
37
38    impl Drop for Session {
39        fn drop(&mut self) {
40            self.close(SessionStatus::Exited);
41            if self.dirty {
42                self.client.enqueue_session(self.session_update.clone());
43            }
44        }
45    }
46
47    impl Session {
48        #[cfg(feature = "release-health")]
49        pub fn from_stack(stack: &StackLayer) -> Option<Self> {
50            let client = stack.client.as_ref()?;
51            let options = client.options();
52            let user = stack.scope.user.as_deref();
53            let distinct_id = user
54                .and_then(|user| {
55                    user.id
56                        .as_ref()
57                        .or(user.email.as_ref())
58                        .or(user.username.as_ref())
59                })
60                .cloned();
61            Some(Self {
62                client: client.clone(),
63                session_update: SessionUpdate {
64                    session_id: random_uuid(),
65                    distinct_id,
66                    sequence: None,
67                    timestamp: None,
68                    started: SystemTime::now(),
69                    init: true,
70                    duration: None,
71                    status: SessionStatus::Ok,
72                    errors: 0,
73                    attributes: SessionAttributes {
74                        release: options.release.clone()?,
75                        environment: options.environment.clone(),
76                        ip_address: None,
77                        user_agent: None,
78                    },
79                },
80                started: Instant::now(),
81                dirty: true,
82            })
83        }
84
85        pub(crate) fn update_from_event(&mut self, event: &Event<'static>) {
86            if self.session_update.status != SessionStatus::Ok {
87                // a session that has already transitioned to a "terminal" state
88                // should not receive any more updates
89                return;
90            }
91            let mut has_error = event.level >= Level::Error;
92            let mut is_crash = false;
93            for exc in &event.exception.values {
94                has_error = true;
95                if let Some(mechanism) = &exc.mechanism {
96                    if let Some(false) = mechanism.handled {
97                        is_crash = true;
98                        break;
99                    }
100                }
101            }
102
103            if is_crash {
104                self.session_update.status = SessionStatus::Crashed;
105            }
106            if has_error {
107                self.session_update.errors += 1;
108                self.dirty = true;
109            }
110        }
111
112        pub(crate) fn close(&mut self, status: SessionStatus) {
113            if self.session_update.status == SessionStatus::Ok {
114                let status = match status {
115                    SessionStatus::Ok => SessionStatus::Exited,
116                    s => s,
117                };
118                self.session_update.duration = Some(self.started.elapsed().as_secs_f64());
119                self.session_update.status = status;
120                self.dirty = true;
121            }
122        }
123
124        #[cfg(feature = "release-health")]
125        pub(crate) fn create_envelope_item(&mut self) -> Option<EnvelopeItem> {
126            if self.dirty {
127                let item = self.session_update.clone().into();
128                self.session_update.init = false;
129                self.dirty = false;
130                return Some(item);
131            }
132            None
133        }
134    }
135
136    // as defined here: https://develop.sentry.dev/sdk/envelopes/#size-limits
137    const MAX_SESSION_ITEMS: usize = 100;
138    #[cfg(feature = "release-health")]
139    const FLUSH_INTERVAL: Duration = Duration::from_secs(60);
140
141    #[derive(Debug, Default)]
142    struct SessionQueue {
143        individual: Vec<SessionUpdate<'static>>,
144        aggregated: Option<AggregatedSessions>,
145    }
146
147    #[derive(Debug)]
148    struct AggregatedSessions {
149        buckets: HashMap<AggregationKey, AggregationCounts>,
150        attributes: SessionAttributes<'static>,
151    }
152
153    impl From<AggregatedSessions> for EnvelopeItem {
154        fn from(sessions: AggregatedSessions) -> Self {
155            let aggregates = sessions
156                .buckets
157                .into_iter()
158                .map(|(key, counts)| SessionAggregateItem {
159                    started: key.started,
160                    distinct_id: key.distinct_id,
161                    exited: counts.exited,
162                    errored: counts.errored,
163                    abnormal: counts.abnormal,
164                    crashed: counts.crashed,
165                })
166                .collect();
167
168            SessionAggregates {
169                aggregates,
170                attributes: sessions.attributes,
171            }
172            .into()
173        }
174    }
175
176    #[derive(Debug, PartialEq, Eq, Hash)]
177    struct AggregationKey {
178        started: SystemTime,
179        distinct_id: Option<String>,
180    }
181
182    #[derive(Debug, Default)]
183    struct AggregationCounts {
184        exited: u32,
185        errored: u32,
186        abnormal: u32,
187        crashed: u32,
188    }
189
190    /// Background Session Flusher
191    ///
192    /// The background flusher queues session updates for delayed batched sending.
193    /// It has its own background thread that will flush its queue once every
194    /// `FLUSH_INTERVAL`.
195    pub(crate) struct SessionFlusher {
196        transport: TransportArc,
197        mode: SessionMode,
198        queue: Arc<Mutex<SessionQueue>>,
199        shutdown: Arc<(Mutex<bool>, Condvar)>,
200        worker: Option<JoinHandle<()>>,
201    }
202
203    impl SessionFlusher {
204        /// Creates a new Flusher that will submit envelopes to the given `transport`.
205        #[cfg(feature = "release-health")]
206        pub fn new(transport: TransportArc, mode: SessionMode) -> Self {
207            let queue = Arc::new(Mutex::new(Default::default()));
208            #[allow(clippy::mutex_atomic)]
209            let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
210
211            let worker_transport = transport.clone();
212            let worker_queue = queue.clone();
213            let worker_shutdown = shutdown.clone();
214            let worker = std::thread::Builder::new()
215                .name("sentry-session-flusher".into())
216                .spawn(move || {
217                    let (lock, cvar) = worker_shutdown.as_ref();
218                    let mut shutdown = lock.lock().unwrap();
219                    // check this immediately, in case the main thread is already shutting down
220                    if *shutdown {
221                        return;
222                    }
223                    let mut last_flush = Instant::now();
224                    loop {
225                        let timeout = FLUSH_INTERVAL
226                            .checked_sub(last_flush.elapsed())
227                            .unwrap_or_else(|| Duration::from_secs(0));
228                        shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0;
229                        if *shutdown {
230                            return;
231                        }
232                        if last_flush.elapsed() < FLUSH_INTERVAL {
233                            continue;
234                        }
235                        SessionFlusher::flush_queue_internal(
236                            worker_queue.lock().unwrap(),
237                            &worker_transport,
238                        );
239                        last_flush = Instant::now();
240                    }
241                })
242                .unwrap();
243
244            Self {
245                transport,
246                mode,
247                queue,
248                shutdown,
249                worker: Some(worker),
250            }
251        }
252
253        /// Enqueues a session update for delayed sending.
254        ///
255        /// This will aggregate session counts in request mode, for all sessions
256        /// that were not yet partially sent.
257        pub fn enqueue(&self, session_update: SessionUpdate<'static>) {
258            let mut queue = self.queue.lock().unwrap();
259            if self.mode == SessionMode::Application || !session_update.init {
260                queue.individual.push(session_update);
261                if queue.individual.len() >= MAX_SESSION_ITEMS {
262                    SessionFlusher::flush_queue_internal(queue, &self.transport);
263                }
264                return;
265            }
266
267            let aggregate = queue.aggregated.get_or_insert_with(|| AggregatedSessions {
268                buckets: HashMap::with_capacity(1),
269                attributes: session_update.attributes.clone(),
270            });
271
272            let duration = session_update
273                .started
274                .duration_since(SystemTime::UNIX_EPOCH)
275                .unwrap();
276            let duration = (duration.as_secs() / 60) * 60;
277            let started = SystemTime::UNIX_EPOCH
278                .checked_add(Duration::from_secs(duration))
279                .unwrap();
280
281            let key = AggregationKey {
282                started,
283                distinct_id: session_update.distinct_id,
284            };
285
286            let bucket = aggregate.buckets.entry(key).or_default();
287
288            match session_update.status {
289                SessionStatus::Exited => {
290                    if session_update.errors > 0 {
291                        bucket.errored += 1;
292                    } else {
293                        bucket.exited += 1;
294                    }
295                }
296                SessionStatus::Crashed => {
297                    bucket.crashed += 1;
298                }
299                SessionStatus::Abnormal => {
300                    bucket.abnormal += 1;
301                }
302                SessionStatus::Ok => {
303                    sentry_debug!("unreachable: only closed sessions will be enqueued");
304                }
305            }
306        }
307
308        /// Flushes the queue to the transport.
309        pub fn flush(&self) {
310            let queue = self.queue.lock().unwrap();
311            SessionFlusher::flush_queue_internal(queue, &self.transport);
312        }
313
314        /// Flushes the queue to the transport.
315        ///
316        /// This is a static method as it will be called from both the background
317        /// thread and the main thread on drop.
318        fn flush_queue_internal(
319            mut queue_lock: MutexGuard<SessionQueue>,
320            transport: &TransportArc,
321        ) {
322            let queue = std::mem::take(&mut queue_lock.individual);
323            let aggregate = queue_lock.aggregated.take();
324            drop(queue_lock);
325
326            // send aggregates
327            if let Some(aggregate) = aggregate {
328                if let Some(ref transport) = *transport.read().unwrap() {
329                    let mut envelope = Envelope::new();
330                    envelope.add_item(aggregate);
331                    transport.send_envelope(envelope);
332                }
333            }
334
335            // send individual items
336            if queue.is_empty() {
337                return;
338            }
339
340            let mut envelope = Envelope::new();
341            let mut items = 0;
342
343            for session_update in queue {
344                if items >= MAX_SESSION_ITEMS {
345                    if let Some(ref transport) = *transport.read().unwrap() {
346                        transport.send_envelope(envelope);
347                    }
348                    envelope = Envelope::new();
349                    items = 0;
350                }
351
352                envelope.add_item(session_update);
353                items += 1;
354            }
355
356            if let Some(ref transport) = *transport.read().unwrap() {
357                transport.send_envelope(envelope);
358            }
359        }
360    }
361
362    impl Drop for SessionFlusher {
363        fn drop(&mut self) {
364            let (lock, cvar) = self.shutdown.as_ref();
365            *lock.lock().unwrap() = true;
366            cvar.notify_one();
367
368            if let Some(worker) = self.worker.take() {
369                worker.join().ok();
370            }
371            SessionFlusher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport);
372        }
373    }
374
375    #[cfg(all(test, feature = "test"))]
376    mod tests {
377        use std::cmp::Ordering;
378
379        use super::*;
380        use crate as sentry;
381        use crate::protocol::{Envelope, EnvelopeItem, SessionStatus};
382
383        fn capture_envelopes<F>(f: F) -> Vec<Envelope>
384        where
385            F: FnOnce(),
386        {
387            crate::test::with_captured_envelopes_options(
388                f,
389                crate::ClientOptions {
390                    release: Some("some-release".into()),
391                    ..Default::default()
392                },
393            )
394        }
395
396        #[test]
397        fn test_session_startstop() {
398            let envelopes = capture_envelopes(|| {
399                sentry::start_session();
400                std::thread::sleep(std::time::Duration::from_millis(10));
401            });
402            assert_eq!(envelopes.len(), 1);
403
404            let mut items = envelopes[0].items();
405            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
406                assert_eq!(session.status, SessionStatus::Exited);
407                assert!(session.duration.unwrap() > 0.01);
408                assert_eq!(session.errors, 0);
409                assert_eq!(session.attributes.release, "some-release");
410                assert!(session.init);
411            } else {
412                panic!("expected session");
413            }
414            assert_eq!(items.next(), None);
415        }
416
417        #[test]
418        fn test_session_batching() {
419            let envelopes = capture_envelopes(|| {
420                for _ in 0..(MAX_SESSION_ITEMS * 2) {
421                    sentry::start_session();
422                }
423            });
424            // we only want *two* envelope for all the sessions
425            assert_eq!(envelopes.len(), 2);
426
427            let items = envelopes[0].items().chain(envelopes[1].items());
428            assert_eq!(items.clone().count(), MAX_SESSION_ITEMS * 2);
429            for item in items {
430                assert!(matches!(item, EnvelopeItem::SessionUpdate(_)));
431            }
432        }
433
434        #[test]
435        fn test_session_aggregation() {
436            let envelopes = crate::test::with_captured_envelopes_options(
437                || {
438                    sentry::start_session();
439                    let err = "NaN".parse::<usize>().unwrap_err();
440                    sentry::capture_error(&err);
441
442                    for _ in 0..50 {
443                        sentry::start_session();
444                    }
445                    sentry::end_session();
446
447                    sentry::configure_scope(|scope| {
448                        scope.set_user(Some(sentry::User {
449                            id: Some("foo-bar".into()),
450                            ..Default::default()
451                        }));
452                        scope.add_event_processor(Box::new(|_| None));
453                    });
454
455                    for _ in 0..50 {
456                        sentry::start_session();
457                    }
458
459                    // This error will be discarded because of the event processor,
460                    // and session will not be updated.
461                    // Only events dropped due to sampling should update the session.
462                    let err = "NaN".parse::<usize>().unwrap_err();
463                    sentry::capture_error(&err);
464                },
465                crate::ClientOptions {
466                    release: Some("some-release".into()),
467                    #[cfg(feature = "release-health")]
468                    session_mode: SessionMode::Request,
469                    ..Default::default()
470                },
471            );
472            assert_eq!(envelopes.len(), 2);
473
474            let mut items = envelopes[0].items();
475            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
476            assert_eq!(items.next(), None);
477
478            let mut items = envelopes[1].items();
479            if let Some(EnvelopeItem::SessionAggregates(aggregate)) = items.next() {
480                let mut aggregates = aggregate.aggregates.clone();
481                assert_eq!(aggregates.len(), 2);
482                // the order depends on a hashmap and is not stable otherwise
483                aggregates.sort_by(|a, b| {
484                    a.distinct_id
485                        .partial_cmp(&b.distinct_id)
486                        .unwrap_or(Ordering::Less)
487                });
488
489                assert_eq!(aggregates[0].distinct_id, None);
490                assert_eq!(aggregates[0].exited, 50);
491
492                assert_eq!(aggregates[1].errored, 0);
493                assert_eq!(aggregates[1].distinct_id, Some("foo-bar".into()));
494                assert_eq!(aggregates[1].exited, 50);
495            } else {
496                panic!("expected session");
497            }
498            assert_eq!(items.next(), None);
499        }
500
501        #[test]
502        fn test_session_error() {
503            let envelopes = capture_envelopes(|| {
504                sentry::start_session();
505
506                let err = "NaN".parse::<usize>().unwrap_err();
507                sentry::capture_error(&err);
508            });
509            assert_eq!(envelopes.len(), 2);
510
511            let mut items = envelopes[0].items();
512            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
513            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
514                assert_eq!(session.status, SessionStatus::Ok);
515                assert_eq!(session.errors, 1);
516                assert_eq!(session.attributes.release, "some-release");
517                assert!(session.init);
518            } else {
519                panic!("expected session");
520            }
521            assert_eq!(items.next(), None);
522
523            let mut items = envelopes[1].items();
524            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
525                assert_eq!(session.status, SessionStatus::Exited);
526                assert_eq!(session.errors, 1);
527                assert!(!session.init);
528            } else {
529                panic!("expected session");
530            }
531            assert_eq!(items.next(), None);
532        }
533
534        #[test]
535        fn test_session_abnormal() {
536            let envelopes = capture_envelopes(|| {
537                sentry::start_session();
538                sentry::end_session_with_status(SessionStatus::Abnormal);
539            });
540            assert_eq!(envelopes.len(), 1);
541
542            let mut items = envelopes[0].items();
543            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
544                assert_eq!(session.status, SessionStatus::Abnormal);
545                assert!(session.init);
546            } else {
547                panic!("expected session");
548            }
549            assert_eq!(items.next(), None);
550        }
551
552        #[test]
553        fn test_session_sampled_errors() {
554            let mut envelopes = crate::test::with_captured_envelopes_options(
555                || {
556                    sentry::start_session();
557
558                    for _ in 0..100 {
559                        let err = "NaN".parse::<usize>().unwrap_err();
560                        sentry::capture_error(&err);
561                    }
562                },
563                crate::ClientOptions {
564                    release: Some("some-release".into()),
565                    sample_rate: 0.5,
566                    ..Default::default()
567                },
568            );
569            assert!(envelopes.len() > 25);
570            assert!(envelopes.len() < 75);
571
572            let envelope = envelopes.pop().unwrap();
573            let mut items = envelope.items();
574            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
575                assert_eq!(session.status, SessionStatus::Exited);
576                assert_eq!(session.errors, 100);
577            } else {
578                panic!("expected session");
579            }
580            assert_eq!(items.next(), None);
581        }
582
583        /// For _user-mode_ sessions, we want to inherit the session for any _new_
584        /// Hub that is spawned from the main thread Hub which already has a session
585        /// attached
586        #[test]
587        fn test_inherit_session_from_top() {
588            let envelopes = capture_envelopes(|| {
589                sentry::start_session();
590
591                let err = "NaN".parse::<usize>().unwrap_err();
592                sentry::capture_error(&err);
593
594                // create a new Hub which should have the same session
595                let hub = std::sync::Arc::new(sentry::Hub::new_from_top(sentry::Hub::current()));
596
597                sentry::Hub::run(hub, || {
598                    let err = "NaN".parse::<usize>().unwrap_err();
599                    sentry::capture_error(&err);
600
601                    sentry::with_scope(
602                        |_| {},
603                        || {
604                            let err = "NaN".parse::<usize>().unwrap_err();
605                            sentry::capture_error(&err);
606                        },
607                    );
608                });
609            });
610
611            assert_eq!(envelopes.len(), 4); // 3 errors and one session end
612
613            let mut items = envelopes[3].items();
614            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
615                assert_eq!(session.status, SessionStatus::Exited);
616                assert_eq!(session.errors, 3);
617                assert!(!session.init);
618            } else {
619                panic!("expected session");
620            }
621            assert_eq!(items.next(), None);
622        }
623
624        /// We want to forward-inherit sessions as the previous test asserted, but
625        /// not *backwards*. So any new session created in a derived Hub and scope
626        /// will only get updates from that particular scope.
627        #[test]
628        fn test_dont_inherit_session_backwards() {
629            let envelopes = capture_envelopes(|| {
630                let hub = std::sync::Arc::new(sentry::Hub::new_from_top(sentry::Hub::current()));
631
632                sentry::Hub::run(hub, || {
633                    sentry::with_scope(
634                        |_| {},
635                        || {
636                            sentry::start_session();
637
638                            let err = "NaN".parse::<usize>().unwrap_err();
639                            sentry::capture_error(&err);
640                        },
641                    );
642
643                    let err = "NaN".parse::<usize>().unwrap_err();
644                    sentry::capture_error(&err);
645                });
646
647                let err = "NaN".parse::<usize>().unwrap_err();
648                sentry::capture_error(&err);
649            });
650
651            assert_eq!(envelopes.len(), 4); // 3 errors and one session end
652
653            let mut items = envelopes[0].items();
654            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
655            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
656                assert_eq!(session.status, SessionStatus::Ok);
657                assert_eq!(session.errors, 1);
658                assert!(session.init);
659            } else {
660                panic!("expected session");
661            }
662            assert_eq!(items.next(), None);
663
664            // the other two events should not have session updates
665            let mut items = envelopes[1].items();
666            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
667            assert_eq!(items.next(), None);
668
669            let mut items = envelopes[2].items();
670            assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
671            assert_eq!(items.next(), None);
672
673            // the session end is sent last as it is possibly batched
674            let mut items = envelopes[3].items();
675            if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
676                assert_eq!(session.status, SessionStatus::Exited);
677                assert_eq!(session.errors, 1);
678                assert!(!session.init);
679            } else {
680                panic!("expected session");
681            }
682            assert_eq!(items.next(), None);
683        }
684    }
685}