console_subscriber/
record.rs

1use console_api as proto;
2use crossbeam_channel::{Receiver, Sender};
3use serde::{
4    ser::{SerializeSeq, SerializeStruct},
5    Serialize,
6};
7use std::{fs::File, io, path::Path, time::SystemTime};
8
9/// This marks the currently understood version of the recording format. This
10/// should be increased whenever the format has a breaking change that we
11/// cannot parse. Though, even better, we should probably support parsing
12/// older versions.
13///
14/// But while this is in rapid development, we can move fast and break things.
15const DATA_FORMAT_VERSION: u8 = 1;
16
17pub(crate) struct Recorder {
18    tx: Sender<Event>,
19    // TODO(eliza): terminate and flush when dropping...
20    _worker: std::thread::JoinHandle<()>,
21}
22
23#[derive(Serialize)]
24struct Header {
25    v: u8,
26}
27
28#[derive(Serialize)]
29pub(crate) enum Event {
30    Spawn {
31        id: u64,
32        at: SystemTime,
33        fields: SerializeFields,
34    },
35    Enter {
36        id: u64,
37        at: SystemTime,
38    },
39    Exit {
40        id: u64,
41        at: SystemTime,
42    },
43    Close {
44        id: u64,
45        at: SystemTime,
46    },
47    Waker {
48        id: u64,
49        op: super::WakeOp,
50        at: SystemTime,
51    },
52}
53
54pub(crate) struct SerializeFields(pub(crate) Vec<proto::Field>);
55
56struct SerializeField<'a>(&'a proto::Field);
57
58impl Recorder {
59    pub(crate) fn new(path: &Path) -> io::Result<Self> {
60        let file = std::fs::File::create(path)?;
61        let (tx, rx) = crossbeam_channel::bounded(4096);
62        let _worker = std::thread::Builder::new()
63            .name("console/subscriber/recorder/io".into())
64            .spawn(move || {
65                if let Err(e) = record_io(file, rx) {
66                    eprintln!("event recorder failed: {}", e);
67                }
68            })?;
69
70        let recorder = Recorder { tx, _worker };
71
72        Ok(recorder)
73    }
74
75    pub(crate) fn record(&self, event: Event) {
76        if self.tx.send(event).is_err() {
77            eprintln!("event recorder thread has terminated!");
78        }
79    }
80}
81
82fn record_io(file: File, rx: Receiver<Event>) -> io::Result<()> {
83    use std::io::{BufWriter, Write};
84
85    fn write<T: Serialize>(mut file: &mut BufWriter<File>, val: &T) -> io::Result<()> {
86        // Clippy throws a false positive here. We can't actually pass the owned `file` to
87        // `to_writer` because we need it again in the line blow.
88        #[allow(clippy::needless_borrows_for_generic_args)]
89        serde_json::to_writer(&mut file, val)?;
90        file.write_all(b"\n")
91    }
92
93    let mut file = BufWriter::new(file);
94    write(
95        &mut file,
96        &Header {
97            v: DATA_FORMAT_VERSION,
98        },
99    )?;
100
101    // wait to receive an event...
102    while let Ok(event) = rx.recv() {
103        // TODO: what to do if file error?
104        write(&mut file, &event)?;
105
106        // drain any additional events that are ready now
107        while let Ok(event) = rx.try_recv() {
108            write(&mut file, &event)?;
109        }
110
111        file.flush()?;
112    }
113
114    tracing::debug!("event stream ended; flushing file");
115    file.flush()
116}
117
118impl serde::Serialize for SerializeFields {
119    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
120    where
121        S: serde::Serializer,
122    {
123        let mut seq = serializer.serialize_seq(Some(self.0.len()))?;
124        for element in &self.0 {
125            seq.serialize_element(&SerializeField(element))?;
126        }
127        seq.end()
128    }
129}
130
131impl serde::Serialize for SerializeField<'_> {
132    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
133    where
134        S: serde::Serializer,
135    {
136        let mut ser = serializer.serialize_struct("Field", 2)?;
137        ser.serialize_field(
138            "name",
139            match self.0.name.as_ref().expect("name") {
140                proto::field::Name::StrName(ref n) => n,
141                proto::field::Name::NameIdx(_idx) => todo!("metadata idx"),
142            },
143        )?;
144
145        match self.0.value.as_ref().expect("field value") {
146            proto::field::Value::DebugVal(v) | proto::field::Value::StrVal(v) => {
147                ser.serialize_field("value", v)?;
148            }
149            proto::field::Value::U64Val(v) => {
150                ser.serialize_field("value", v)?;
151            }
152            proto::field::Value::I64Val(v) => {
153                ser.serialize_field("value", v)?;
154            }
155            proto::field::Value::BoolVal(v) => {
156                ser.serialize_field("value", v)?;
157            }
158        }
159        ser.end()
160    }
161}