Skip to main content

timely_logging/
lib.rs

1
2use std::rc::Rc;
3use std::cell::RefCell;
4use std::any::Any;
5use std::collections::HashMap;
6use std::time::{Instant, Duration};
7use std::fmt::{self, Debug};
8use std::marker::PhantomData;
9
10use timely_container::{ContainerBuilder, PushInto};
11
12/// A registry binding names to typed loggers.
13pub struct Registry {
14    /// A map from names to typed loggers.
15    map: HashMap<String, (Box<dyn Any>, Box<dyn Flush>)>,
16    /// An instant common to all logging statements.
17    time: Instant,
18}
19
20impl Registry {
21    /// Binds a log name to an action on log event batches.
22    ///
23    /// This method also returns any pre-installed action, rather than overwriting it
24    /// and pivoting the logging destination mid-stream. New loggers with this name will
25    /// use the new destination, and existing loggers will use the old destination.
26    ///
27    /// The action should respond to a sequence of events with non-decreasing timestamps
28    /// (Durations) and well as a timestamp that lower bounds the next event that could be
29    /// seen (likely greater or equal to the timestamp of the last event). The end of a
30    /// logging stream is indicated only by dropping the associated action, which can be
31    /// accomplished with `remove` (or a call to insert, though this is not recommended).
32    ///
33    /// The action needs to follow the requirements of container builder `CB` regarding what they
34    /// need to do with containers they receive and what properties to uphold.
35    ///
36    /// Passing a `&mut None` container to an action indicates a flush.
37    pub fn insert<CB: ContainerBuilder<Container: Default> + 'static, F: FnMut(&Duration, &mut Option<CB::Container>)+'static>(
38        &mut self,
39        name: &str,
40        action: F) -> Option<Box<dyn Any>>
41    {
42        let logger = Logger::<CB>::new(self.time, Duration::default(), action);
43        self.insert_logger(name, logger)
44    }
45
46    /// Binds a log name to a logger.
47    pub fn insert_logger<CB: ContainerBuilder<Container: Default> + 'static>(&mut self, name: &str, logger: Logger<CB>) -> Option<Box<dyn Any>> {
48        self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0)
49    }
50
51    /// Removes a bound logger.
52    ///
53    /// This is intended primarily to close a logging stream and let the associated writer
54    /// communicate that the stream is closed to any consumers. If a binding is not removed,
55    /// then the stream cannot be complete as in principle anyone could acquire a handle to
56    /// the logger and start further logging.
57    pub fn remove(&mut self, name: &str) -> Option<Box<dyn Any>> {
58        self.map.remove(name).map(|x| x.0)
59    }
60
61    /// Retrieves a shared logger, if one has been inserted.
62    pub fn get<CB: ContainerBuilder<Container: Default> + 'static>(&self, name: &str) -> Option<Logger<CB>> {
63        self.map
64            .get(name)
65            .and_then(|entry| entry.0.downcast_ref::<Logger<CB>>())
66            .map(|x| (*x).clone())
67    }
68
69    /// Iterates over the names of currently bound loggers.
70    pub fn names(&self) -> impl Iterator<Item = &str> {
71        self.map.keys().map(String::as_str)
72    }
73
74    /// Creates a new logger registry.
75    pub fn new(time: Instant) -> Self {
76        Registry {
77            time,
78            map: HashMap::new(),
79        }
80    }
81
82    /// Flushes all registered logs.
83    pub fn flush(&mut self) {
84        <Self as Flush>::flush(self);
85    }
86}
87
88impl Flush for Registry {
89    fn flush(&self) {
90        for value in self.map.values() {
91            value.1.flush();
92        }
93    }
94}
95
96/// A buffering logger.
97pub struct Logger<CB: ContainerBuilder<Container: Default>> {
98    inner: Rc<RefCell<LoggerInner<CB, dyn FnMut(&Duration, &mut Option<CB::Container>)>>>,
99}
100
101impl<CB: ContainerBuilder<Container: Default>> Clone for Logger<CB> {
102    fn clone(&self) -> Self {
103        Self {
104            inner: Rc::clone(&self.inner)
105        }
106    }
107}
108
109impl<CB: ContainerBuilder<Container: Default> + Debug> Debug for Logger<CB> {
110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
111        f.debug_struct("Logger")
112            .field("inner", &self.inner)
113            .finish()
114    }
115}
116
117struct LoggerInner<CB: ContainerBuilder<Container: Default>, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> {
118    /// common instant used for all loggers.
119    time:   Instant,
120    /// offset to allow re-calibration.
121    offset: Duration,
122    /// container builder to produce buffers of accumulated log events
123    builder: CB,
124    /// action to take on full log buffers, or on flush.
125    action: A,
126}
127
128impl<CB: ContainerBuilder<Container: Default>> Logger<CB> {
129    /// Allocates a new shareable logger bound to a write destination.
130    pub fn new<F>(time: Instant, offset: Duration, action: F) -> Self
131    where
132        F: FnMut(&Duration, &mut Option<CB::Container>)+'static
133    {
134        let inner = LoggerInner {
135            time,
136            offset,
137            action,
138            builder: CB::default(),
139        };
140        let inner = Rc::new(RefCell::new(inner));
141        Logger { inner }
142    }
143
144    /// Logs an event.
145    ///
146    /// The event has its timestamp recorded at the moment of logging, but it may be delayed
147    /// due to buffering. It will be written when the logger is next flushed, either due to
148    /// the buffer reaching capacity or a direct call to flush.
149    ///
150    /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
151    /// that the `action` only sees one stream of events with increasing timestamps. This may
152    /// have a cost that we don't entirely understand.
153    pub fn log<T>(&self, event: T) where CB: PushInto<(Duration, T)> {
154        self.log_many(Some(event));
155    }
156
157    /// Logs multiple events.
158    ///
159    /// The event has its timestamp recorded at the moment of logging, but it may be delayed
160    /// due to buffering. It will be written when the logger is next flushed, either due to
161    /// the buffer reaching capacity or a direct call to flush.
162    ///
163    /// All events in this call will have the same timestamp. This can be more performant due
164    /// to fewer `time.elapsed()` calls, but it also allows some logged events to appear to be
165    /// "transactional", occurring at the same moment.
166    ///
167    /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
168    /// that the `action` only sees one stream of events with increasing timestamps. This may
169    /// have a cost that we don't entirely understand.
170    pub fn log_many<I>(&self, events: I)
171    where I: IntoIterator, CB: PushInto<(Duration, I::Item)>
172    {
173        self.inner.borrow_mut().log_many(events)
174    }
175
176    /// Flushes logged messages and communicates the new minimal timestamp.
177    pub fn flush(&self) {
178        <Self as Flush>::flush(self);
179    }
180
181    /// Obtain a typed logger.
182    pub fn into_typed<T>(self) -> TypedLogger<CB, T> {
183        self.into()
184    }
185}
186
187/// A logger that's typed to specific events. Its `log` functions accept events that can be
188/// converted into `T`. Dereferencing a `TypedLogger` gives you a [`Logger`] that can log any
189/// compatible type.
190///
191/// Construct a `TypedLogger` with [`Logger::into_typed`] or by calling `into` on a `Logger`.
192#[derive(Debug)]
193pub struct TypedLogger<CB: ContainerBuilder<Container: Default>, T> {
194    inner: Logger<CB>,
195    _marker: PhantomData<T>,
196}
197
198impl<CB: ContainerBuilder<Container: Default>, T> TypedLogger<CB, T> {
199    /// Logs an event. Equivalent to [`Logger::log`], with the exception that it converts the
200    /// event to `T` before logging.
201    pub fn log<S: Into<T>>(&self, event: S)
202    where
203        CB: PushInto<(Duration, T)>,
204    {
205        self.inner.log(event.into());
206    }
207
208    /// Logs multiple events. Equivalent to [`Logger::log_many`], with the exception that it
209    /// converts the events to `T` before logging.
210    pub fn log_many<I>(&self, events: I)
211    where
212        I: IntoIterator, I::Item: Into<T>,
213        CB: PushInto<(Duration, T)>,
214    {
215        self.inner.log_many(events.into_iter().map(Into::into));
216    }
217}
218
219impl<CB: ContainerBuilder<Container: Default>, T> Clone for TypedLogger<CB, T> {
220    fn clone(&self) -> Self {
221        Self {
222            inner: self.inner.clone(),
223            _marker: PhantomData,
224        }
225    }
226}
227
228impl<CB: ContainerBuilder<Container: Default>, T> From<Logger<CB>> for TypedLogger<CB, T> {
229    fn from(inner: Logger<CB>) -> Self {
230        TypedLogger {
231            inner,
232            _marker: PhantomData,
233        }
234    }
235}
236
237impl<CB: ContainerBuilder<Container: Default>, T> std::ops::Deref for TypedLogger<CB, T> {
238    type Target = Logger<CB>;
239    fn deref(&self) -> &Self::Target {
240        &self.inner
241    }
242}
243
244impl<CB: ContainerBuilder<Container: Default>, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> LoggerInner<CB, A> {
245    /// Push a container with a time at an action.
246    #[inline]
247    fn push(action: &mut A, time: &Duration, container: &mut CB::Container) {
248        let mut c = Some(std::mem::take(container));
249        action(time, &mut c);
250        if let Some(c) = c {
251            *container = c;
252        }
253    }
254
255    fn log_many<I>(&mut self, events: I)
256        where I: IntoIterator, CB: PushInto<(Duration, I::Item)>,
257    {
258        let elapsed = self.time.elapsed() + self.offset;
259        for event in events {
260            self.builder.push_into((elapsed, event));
261            while let Some(container) = self.builder.extract() {
262                Self::push(&mut self.action, &elapsed, container);
263            }
264        }
265    }
266
267    fn flush(&mut self) {
268        let elapsed = self.time.elapsed() + self.offset;
269
270        while let Some(container) = self.builder.finish() {
271            Self::push(&mut self.action, &elapsed, container);
272        }
273
274        // Send no container to indicate flush.
275        (self.action)(&elapsed, &mut None);
276    }
277}
278
279/// Flush on the *last* drop of a logger.
280impl<CB: ContainerBuilder<Container: Default>, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Drop for LoggerInner<CB, A> {
281    fn drop(&mut self) {
282        self.flush();
283    }
284}
285
286impl<CB, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Debug for LoggerInner<CB, A>
287where
288    CB: ContainerBuilder<Container: Default> + Debug,
289{
290    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291        f.debug_struct("LoggerInner")
292            .field("time", &self.time)
293            .field("offset", &self.offset)
294            .field("action", &"FnMut")
295            .field("builder", &self.builder)
296            .finish()
297    }
298}
299
300/// Types that can be flushed.
301trait Flush {
302    /// Flushes buffered data.
303    fn flush(&self);
304}
305
306impl<CB: ContainerBuilder<Container: Default>> Flush for Logger<CB> {
307    fn flush(&self) {
308        self.inner.borrow_mut().flush()
309    }
310}