console_subscriber/
record.rs
1use console_api as proto;
2use crossbeam_channel::{Receiver, Sender};
3use serde::{
4 ser::{SerializeSeq, SerializeStruct},
5 Serialize,
6};
7use std::{fs::File, io, path::Path, time::SystemTime};
8
9const DATA_FORMAT_VERSION: u8 = 1;
16
17pub(crate) struct Recorder {
18 tx: Sender<Event>,
19 _worker: std::thread::JoinHandle<()>,
21}
22
23#[derive(Serialize)]
24struct Header {
25 v: u8,
26}
27
28#[derive(Serialize)]
29pub(crate) enum Event {
30 Spawn {
31 id: u64,
32 at: SystemTime,
33 fields: SerializeFields,
34 },
35 Enter {
36 id: u64,
37 at: SystemTime,
38 },
39 Exit {
40 id: u64,
41 at: SystemTime,
42 },
43 Close {
44 id: u64,
45 at: SystemTime,
46 },
47 Waker {
48 id: u64,
49 op: super::WakeOp,
50 at: SystemTime,
51 },
52}
53
54pub(crate) struct SerializeFields(pub(crate) Vec<proto::Field>);
55
56struct SerializeField<'a>(&'a proto::Field);
57
58impl Recorder {
59 pub(crate) fn new(path: &Path) -> io::Result<Self> {
60 let file = std::fs::File::create(path)?;
61 let (tx, rx) = crossbeam_channel::bounded(4096);
62 let _worker = std::thread::Builder::new()
63 .name("console/subscriber/recorder/io".into())
64 .spawn(move || {
65 if let Err(e) = record_io(file, rx) {
66 eprintln!("event recorder failed: {}", e);
67 }
68 })?;
69
70 let recorder = Recorder { tx, _worker };
71
72 Ok(recorder)
73 }
74
75 pub(crate) fn record(&self, event: Event) {
76 if self.tx.send(event).is_err() {
77 eprintln!("event recorder thread has terminated!");
78 }
79 }
80}
81
82fn record_io(file: File, rx: Receiver<Event>) -> io::Result<()> {
83 use std::io::{BufWriter, Write};
84
85 fn write<T: Serialize>(mut file: &mut BufWriter<File>, val: &T) -> io::Result<()> {
86 #[allow(clippy::needless_borrows_for_generic_args)]
89 serde_json::to_writer(&mut file, val)?;
90 file.write_all(b"\n")
91 }
92
93 let mut file = BufWriter::new(file);
94 write(
95 &mut file,
96 &Header {
97 v: DATA_FORMAT_VERSION,
98 },
99 )?;
100
101 while let Ok(event) = rx.recv() {
103 write(&mut file, &event)?;
105
106 while let Ok(event) = rx.try_recv() {
108 write(&mut file, &event)?;
109 }
110
111 file.flush()?;
112 }
113
114 tracing::debug!("event stream ended; flushing file");
115 file.flush()
116}
117
118impl serde::Serialize for SerializeFields {
119 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
120 where
121 S: serde::Serializer,
122 {
123 let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
124 for element in &self.0 {
125 seq.serialize_element(&SerializeField(element))?;
126 }
127 seq.end()
128 }
129}
130
131impl serde::Serialize for SerializeField<'_> {
132 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
133 where
134 S: serde::Serializer,
135 {
136 let mut ser = serializer.serialize_struct("Field", 2)?;
137 ser.serialize_field(
138 "name",
139 match self.0.name.as_ref().expect("name") {
140 proto::field::Name::StrName(ref n) => n,
141 proto::field::Name::NameIdx(_idx) => todo!("metadata idx"),
142 },
143 )?;
144
145 match self.0.value.as_ref().expect("field value") {
146 proto::field::Value::DebugVal(v) | proto::field::Value::StrVal(v) => {
147 ser.serialize_field("value", v)?;
148 }
149 proto::field::Value::U64Val(v) => {
150 ser.serialize_field("value", v)?;
151 }
152 proto::field::Value::I64Val(v) => {
153 ser.serialize_field("value", v)?;
154 }
155 proto::field::Value::BoolVal(v) => {
156 ser.serialize_field("value", v)?;
157 }
158 }
159 ser.end()
160 }
161}