1#[cfg(feature = "release-health")]
6pub use session_impl::*;
7
8#[cfg(feature = "release-health")]
9mod session_impl {
10
11 use std::collections::HashMap;
12 use std::sync::{Arc, Condvar, Mutex, MutexGuard};
13 use std::thread::JoinHandle;
14 use std::time::{Duration, Instant, SystemTime};
15
16 use crate::client::TransportArc;
17 use crate::clientoptions::SessionMode;
18 use crate::protocol::{
19 EnvelopeItem, Event, Level, SessionAggregateItem, SessionAggregates, SessionAttributes,
20 SessionStatus, SessionUpdate,
21 };
22
23 #[cfg(feature = "release-health")]
24 use crate::scope::StackLayer;
25
26 #[cfg(feature = "release-health")]
27 use crate::types::random_uuid;
28 use crate::{Client, Envelope};
29
30 #[derive(Clone, Debug)]
31 pub struct Session {
32 client: Arc<Client>,
33 session_update: SessionUpdate<'static>,
34 started: Instant,
35 dirty: bool,
36 }
37
38 impl Drop for Session {
39 fn drop(&mut self) {
40 self.close(SessionStatus::Exited);
41 if self.dirty {
42 self.client.enqueue_session(self.session_update.clone());
43 }
44 }
45 }
46
47 impl Session {
48 #[cfg(feature = "release-health")]
49 pub fn from_stack(stack: &StackLayer) -> Option<Self> {
50 let client = stack.client.as_ref()?;
51 let options = client.options();
52 let user = stack.scope.user.as_deref();
53 let distinct_id = user
54 .and_then(|user| {
55 user.id
56 .as_ref()
57 .or(user.email.as_ref())
58 .or(user.username.as_ref())
59 })
60 .cloned();
61 Some(Self {
62 client: client.clone(),
63 session_update: SessionUpdate {
64 session_id: random_uuid(),
65 distinct_id,
66 sequence: None,
67 timestamp: None,
68 started: SystemTime::now(),
69 init: true,
70 duration: None,
71 status: SessionStatus::Ok,
72 errors: 0,
73 attributes: SessionAttributes {
74 release: options.release.clone()?,
75 environment: options.environment.clone(),
76 ip_address: None,
77 user_agent: None,
78 },
79 },
80 started: Instant::now(),
81 dirty: true,
82 })
83 }
84
85 pub(crate) fn update_from_event(&mut self, event: &Event<'static>) {
86 if self.session_update.status != SessionStatus::Ok {
87 return;
90 }
91 let mut has_error = event.level >= Level::Error;
92 let mut is_crash = false;
93 for exc in &event.exception.values {
94 has_error = true;
95 if let Some(mechanism) = &exc.mechanism {
96 if let Some(false) = mechanism.handled {
97 is_crash = true;
98 break;
99 }
100 }
101 }
102
103 if is_crash {
104 self.session_update.status = SessionStatus::Crashed;
105 }
106 if has_error {
107 self.session_update.errors += 1;
108 self.dirty = true;
109 }
110 }
111
112 pub(crate) fn close(&mut self, status: SessionStatus) {
113 if self.session_update.status == SessionStatus::Ok {
114 let status = match status {
115 SessionStatus::Ok => SessionStatus::Exited,
116 s => s,
117 };
118 self.session_update.duration = Some(self.started.elapsed().as_secs_f64());
119 self.session_update.status = status;
120 self.dirty = true;
121 }
122 }
123
124 #[cfg(feature = "release-health")]
125 pub(crate) fn create_envelope_item(&mut self) -> Option<EnvelopeItem> {
126 if self.dirty {
127 let item = self.session_update.clone().into();
128 self.session_update.init = false;
129 self.dirty = false;
130 return Some(item);
131 }
132 None
133 }
134 }
135
136 const MAX_SESSION_ITEMS: usize = 100;
138 #[cfg(feature = "release-health")]
139 const FLUSH_INTERVAL: Duration = Duration::from_secs(60);
140
141 #[derive(Debug, Default)]
142 struct SessionQueue {
143 individual: Vec<SessionUpdate<'static>>,
144 aggregated: Option<AggregatedSessions>,
145 }
146
147 #[derive(Debug)]
148 struct AggregatedSessions {
149 buckets: HashMap<AggregationKey, AggregationCounts>,
150 attributes: SessionAttributes<'static>,
151 }
152
153 impl From<AggregatedSessions> for EnvelopeItem {
154 fn from(sessions: AggregatedSessions) -> Self {
155 let aggregates = sessions
156 .buckets
157 .into_iter()
158 .map(|(key, counts)| SessionAggregateItem {
159 started: key.started,
160 distinct_id: key.distinct_id,
161 exited: counts.exited,
162 errored: counts.errored,
163 abnormal: counts.abnormal,
164 crashed: counts.crashed,
165 })
166 .collect();
167
168 SessionAggregates {
169 aggregates,
170 attributes: sessions.attributes,
171 }
172 .into()
173 }
174 }
175
176 #[derive(Debug, PartialEq, Eq, Hash)]
177 struct AggregationKey {
178 started: SystemTime,
179 distinct_id: Option<String>,
180 }
181
182 #[derive(Debug, Default)]
183 struct AggregationCounts {
184 exited: u32,
185 errored: u32,
186 abnormal: u32,
187 crashed: u32,
188 }
189
190 pub(crate) struct SessionFlusher {
196 transport: TransportArc,
197 mode: SessionMode,
198 queue: Arc<Mutex<SessionQueue>>,
199 shutdown: Arc<(Mutex<bool>, Condvar)>,
200 worker: Option<JoinHandle<()>>,
201 }
202
203 impl SessionFlusher {
204 #[cfg(feature = "release-health")]
206 pub fn new(transport: TransportArc, mode: SessionMode) -> Self {
207 let queue = Arc::new(Mutex::new(Default::default()));
208 #[allow(clippy::mutex_atomic)]
209 let shutdown = Arc::new((Mutex::new(false), Condvar::new()));
210
211 let worker_transport = transport.clone();
212 let worker_queue = queue.clone();
213 let worker_shutdown = shutdown.clone();
214 let worker = std::thread::Builder::new()
215 .name("sentry-session-flusher".into())
216 .spawn(move || {
217 let (lock, cvar) = worker_shutdown.as_ref();
218 let mut shutdown = lock.lock().unwrap();
219 if *shutdown {
221 return;
222 }
223 let mut last_flush = Instant::now();
224 loop {
225 let timeout = FLUSH_INTERVAL
226 .checked_sub(last_flush.elapsed())
227 .unwrap_or_else(|| Duration::from_secs(0));
228 shutdown = cvar.wait_timeout(shutdown, timeout).unwrap().0;
229 if *shutdown {
230 return;
231 }
232 if last_flush.elapsed() < FLUSH_INTERVAL {
233 continue;
234 }
235 SessionFlusher::flush_queue_internal(
236 worker_queue.lock().unwrap(),
237 &worker_transport,
238 );
239 last_flush = Instant::now();
240 }
241 })
242 .unwrap();
243
244 Self {
245 transport,
246 mode,
247 queue,
248 shutdown,
249 worker: Some(worker),
250 }
251 }
252
253 pub fn enqueue(&self, session_update: SessionUpdate<'static>) {
258 let mut queue = self.queue.lock().unwrap();
259 if self.mode == SessionMode::Application || !session_update.init {
260 queue.individual.push(session_update);
261 if queue.individual.len() >= MAX_SESSION_ITEMS {
262 SessionFlusher::flush_queue_internal(queue, &self.transport);
263 }
264 return;
265 }
266
267 let aggregate = queue.aggregated.get_or_insert_with(|| AggregatedSessions {
268 buckets: HashMap::with_capacity(1),
269 attributes: session_update.attributes.clone(),
270 });
271
272 let duration = session_update
273 .started
274 .duration_since(SystemTime::UNIX_EPOCH)
275 .unwrap();
276 let duration = (duration.as_secs() / 60) * 60;
277 let started = SystemTime::UNIX_EPOCH
278 .checked_add(Duration::from_secs(duration))
279 .unwrap();
280
281 let key = AggregationKey {
282 started,
283 distinct_id: session_update.distinct_id,
284 };
285
286 let bucket = aggregate.buckets.entry(key).or_default();
287
288 match session_update.status {
289 SessionStatus::Exited => {
290 if session_update.errors > 0 {
291 bucket.errored += 1;
292 } else {
293 bucket.exited += 1;
294 }
295 }
296 SessionStatus::Crashed => {
297 bucket.crashed += 1;
298 }
299 SessionStatus::Abnormal => {
300 bucket.abnormal += 1;
301 }
302 SessionStatus::Ok => {
303 sentry_debug!("unreachable: only closed sessions will be enqueued");
304 }
305 }
306 }
307
308 pub fn flush(&self) {
310 let queue = self.queue.lock().unwrap();
311 SessionFlusher::flush_queue_internal(queue, &self.transport);
312 }
313
314 fn flush_queue_internal(
319 mut queue_lock: MutexGuard<SessionQueue>,
320 transport: &TransportArc,
321 ) {
322 let queue = std::mem::take(&mut queue_lock.individual);
323 let aggregate = queue_lock.aggregated.take();
324 drop(queue_lock);
325
326 if let Some(aggregate) = aggregate {
328 if let Some(ref transport) = *transport.read().unwrap() {
329 let mut envelope = Envelope::new();
330 envelope.add_item(aggregate);
331 transport.send_envelope(envelope);
332 }
333 }
334
335 if queue.is_empty() {
337 return;
338 }
339
340 let mut envelope = Envelope::new();
341 let mut items = 0;
342
343 for session_update in queue {
344 if items >= MAX_SESSION_ITEMS {
345 if let Some(ref transport) = *transport.read().unwrap() {
346 transport.send_envelope(envelope);
347 }
348 envelope = Envelope::new();
349 items = 0;
350 }
351
352 envelope.add_item(session_update);
353 items += 1;
354 }
355
356 if let Some(ref transport) = *transport.read().unwrap() {
357 transport.send_envelope(envelope);
358 }
359 }
360 }
361
362 impl Drop for SessionFlusher {
363 fn drop(&mut self) {
364 let (lock, cvar) = self.shutdown.as_ref();
365 *lock.lock().unwrap() = true;
366 cvar.notify_one();
367
368 if let Some(worker) = self.worker.take() {
369 worker.join().ok();
370 }
371 SessionFlusher::flush_queue_internal(self.queue.lock().unwrap(), &self.transport);
372 }
373 }
374
375 #[cfg(all(test, feature = "test"))]
376 mod tests {
377 use std::cmp::Ordering;
378
379 use super::*;
380 use crate as sentry;
381 use crate::protocol::{Envelope, EnvelopeItem, SessionStatus};
382
383 fn capture_envelopes<F>(f: F) -> Vec<Envelope>
384 where
385 F: FnOnce(),
386 {
387 crate::test::with_captured_envelopes_options(
388 f,
389 crate::ClientOptions {
390 release: Some("some-release".into()),
391 ..Default::default()
392 },
393 )
394 }
395
396 #[test]
397 fn test_session_startstop() {
398 let envelopes = capture_envelopes(|| {
399 sentry::start_session();
400 std::thread::sleep(std::time::Duration::from_millis(10));
401 });
402 assert_eq!(envelopes.len(), 1);
403
404 let mut items = envelopes[0].items();
405 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
406 assert_eq!(session.status, SessionStatus::Exited);
407 assert!(session.duration.unwrap() > 0.01);
408 assert_eq!(session.errors, 0);
409 assert_eq!(session.attributes.release, "some-release");
410 assert!(session.init);
411 } else {
412 panic!("expected session");
413 }
414 assert_eq!(items.next(), None);
415 }
416
417 #[test]
418 fn test_session_batching() {
419 let envelopes = capture_envelopes(|| {
420 for _ in 0..(MAX_SESSION_ITEMS * 2) {
421 sentry::start_session();
422 }
423 });
424 assert_eq!(envelopes.len(), 2);
426
427 let items = envelopes[0].items().chain(envelopes[1].items());
428 assert_eq!(items.clone().count(), MAX_SESSION_ITEMS * 2);
429 for item in items {
430 assert!(matches!(item, EnvelopeItem::SessionUpdate(_)));
431 }
432 }
433
434 #[test]
435 fn test_session_aggregation() {
436 let envelopes = crate::test::with_captured_envelopes_options(
437 || {
438 sentry::start_session();
439 let err = "NaN".parse::<usize>().unwrap_err();
440 sentry::capture_error(&err);
441
442 for _ in 0..50 {
443 sentry::start_session();
444 }
445 sentry::end_session();
446
447 sentry::configure_scope(|scope| {
448 scope.set_user(Some(sentry::User {
449 id: Some("foo-bar".into()),
450 ..Default::default()
451 }));
452 scope.add_event_processor(Box::new(|_| None));
453 });
454
455 for _ in 0..50 {
456 sentry::start_session();
457 }
458
459 let err = "NaN".parse::<usize>().unwrap_err();
463 sentry::capture_error(&err);
464 },
465 crate::ClientOptions {
466 release: Some("some-release".into()),
467 #[cfg(feature = "release-health")]
468 session_mode: SessionMode::Request,
469 ..Default::default()
470 },
471 );
472 assert_eq!(envelopes.len(), 2);
473
474 let mut items = envelopes[0].items();
475 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
476 assert_eq!(items.next(), None);
477
478 let mut items = envelopes[1].items();
479 if let Some(EnvelopeItem::SessionAggregates(aggregate)) = items.next() {
480 let mut aggregates = aggregate.aggregates.clone();
481 assert_eq!(aggregates.len(), 2);
482 aggregates.sort_by(|a, b| {
484 a.distinct_id
485 .partial_cmp(&b.distinct_id)
486 .unwrap_or(Ordering::Less)
487 });
488
489 assert_eq!(aggregates[0].distinct_id, None);
490 assert_eq!(aggregates[0].exited, 50);
491
492 assert_eq!(aggregates[1].errored, 0);
493 assert_eq!(aggregates[1].distinct_id, Some("foo-bar".into()));
494 assert_eq!(aggregates[1].exited, 50);
495 } else {
496 panic!("expected session");
497 }
498 assert_eq!(items.next(), None);
499 }
500
501 #[test]
502 fn test_session_error() {
503 let envelopes = capture_envelopes(|| {
504 sentry::start_session();
505
506 let err = "NaN".parse::<usize>().unwrap_err();
507 sentry::capture_error(&err);
508 });
509 assert_eq!(envelopes.len(), 2);
510
511 let mut items = envelopes[0].items();
512 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
513 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
514 assert_eq!(session.status, SessionStatus::Ok);
515 assert_eq!(session.errors, 1);
516 assert_eq!(session.attributes.release, "some-release");
517 assert!(session.init);
518 } else {
519 panic!("expected session");
520 }
521 assert_eq!(items.next(), None);
522
523 let mut items = envelopes[1].items();
524 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
525 assert_eq!(session.status, SessionStatus::Exited);
526 assert_eq!(session.errors, 1);
527 assert!(!session.init);
528 } else {
529 panic!("expected session");
530 }
531 assert_eq!(items.next(), None);
532 }
533
534 #[test]
535 fn test_session_abnormal() {
536 let envelopes = capture_envelopes(|| {
537 sentry::start_session();
538 sentry::end_session_with_status(SessionStatus::Abnormal);
539 });
540 assert_eq!(envelopes.len(), 1);
541
542 let mut items = envelopes[0].items();
543 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
544 assert_eq!(session.status, SessionStatus::Abnormal);
545 assert!(session.init);
546 } else {
547 panic!("expected session");
548 }
549 assert_eq!(items.next(), None);
550 }
551
552 #[test]
553 fn test_session_sampled_errors() {
554 let mut envelopes = crate::test::with_captured_envelopes_options(
555 || {
556 sentry::start_session();
557
558 for _ in 0..100 {
559 let err = "NaN".parse::<usize>().unwrap_err();
560 sentry::capture_error(&err);
561 }
562 },
563 crate::ClientOptions {
564 release: Some("some-release".into()),
565 sample_rate: 0.5,
566 ..Default::default()
567 },
568 );
569 assert!(envelopes.len() > 25);
570 assert!(envelopes.len() < 75);
571
572 let envelope = envelopes.pop().unwrap();
573 let mut items = envelope.items();
574 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
575 assert_eq!(session.status, SessionStatus::Exited);
576 assert_eq!(session.errors, 100);
577 } else {
578 panic!("expected session");
579 }
580 assert_eq!(items.next(), None);
581 }
582
583 #[test]
587 fn test_inherit_session_from_top() {
588 let envelopes = capture_envelopes(|| {
589 sentry::start_session();
590
591 let err = "NaN".parse::<usize>().unwrap_err();
592 sentry::capture_error(&err);
593
594 let hub = std::sync::Arc::new(sentry::Hub::new_from_top(sentry::Hub::current()));
596
597 sentry::Hub::run(hub, || {
598 let err = "NaN".parse::<usize>().unwrap_err();
599 sentry::capture_error(&err);
600
601 sentry::with_scope(
602 |_| {},
603 || {
604 let err = "NaN".parse::<usize>().unwrap_err();
605 sentry::capture_error(&err);
606 },
607 );
608 });
609 });
610
611 assert_eq!(envelopes.len(), 4); let mut items = envelopes[3].items();
614 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
615 assert_eq!(session.status, SessionStatus::Exited);
616 assert_eq!(session.errors, 3);
617 assert!(!session.init);
618 } else {
619 panic!("expected session");
620 }
621 assert_eq!(items.next(), None);
622 }
623
624 #[test]
628 fn test_dont_inherit_session_backwards() {
629 let envelopes = capture_envelopes(|| {
630 let hub = std::sync::Arc::new(sentry::Hub::new_from_top(sentry::Hub::current()));
631
632 sentry::Hub::run(hub, || {
633 sentry::with_scope(
634 |_| {},
635 || {
636 sentry::start_session();
637
638 let err = "NaN".parse::<usize>().unwrap_err();
639 sentry::capture_error(&err);
640 },
641 );
642
643 let err = "NaN".parse::<usize>().unwrap_err();
644 sentry::capture_error(&err);
645 });
646
647 let err = "NaN".parse::<usize>().unwrap_err();
648 sentry::capture_error(&err);
649 });
650
651 assert_eq!(envelopes.len(), 4); let mut items = envelopes[0].items();
654 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
655 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
656 assert_eq!(session.status, SessionStatus::Ok);
657 assert_eq!(session.errors, 1);
658 assert!(session.init);
659 } else {
660 panic!("expected session");
661 }
662 assert_eq!(items.next(), None);
663
664 let mut items = envelopes[1].items();
666 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
667 assert_eq!(items.next(), None);
668
669 let mut items = envelopes[2].items();
670 assert!(matches!(items.next(), Some(EnvelopeItem::Event(_))));
671 assert_eq!(items.next(), None);
672
673 let mut items = envelopes[3].items();
675 if let Some(EnvelopeItem::SessionUpdate(session)) = items.next() {
676 assert_eq!(session.status, SessionStatus::Exited);
677 assert_eq!(session.errors, 1);
678 assert!(!session.init);
679 } else {
680 panic!("expected session");
681 }
682 assert_eq!(items.next(), None);
683 }
684 }
685}