timely_logging/
lib.rs

1
2use std::rc::Rc;
3use std::cell::RefCell;
4use std::any::Any;
5use std::collections::HashMap;
6use std::time::{Instant, Duration};
7use std::fmt::{self, Debug};
8use std::marker::PhantomData;
9
10use timely_container::{ContainerBuilder, PushInto};
11
12/// A registry binding names to typed loggers.
13pub struct Registry {
14    /// A map from names to typed loggers.
15    map: HashMap<String, (Box<dyn Any>, Box<dyn Flush>)>,
16    /// An instant common to all logging statements.
17    time: Instant,
18}
19
20impl Registry {
21    /// Binds a log name to an action on log event batches.
22    ///
23    /// This method also returns any pre-installed action, rather than overwriting it
24    /// and pivoting the logging destination mid-stream. New loggers with this name will
25    /// use the new destination, and existing loggers will use the old destination.
26    ///
27    /// The action should respond to a sequence of events with non-decreasing timestamps
28    /// (Durations) and well as a timestamp that lower bounds the next event that could be
29    /// seen (likely greater or equal to the timestamp of the last event). The end of a
30    /// logging stream is indicated only by dropping the associated action, which can be
31    /// accomplished with `remove` (or a call to insert, though this is not recommended).
32    ///
33    /// The action needs to follow the requirements of container builder `CB` regarding what they
34    /// need to do with containers they receive and what properties to uphold.
35    ///
36    /// Passing a `&mut None` container to an action indicates a flush.
37    pub fn insert<CB: ContainerBuilder, F: FnMut(&Duration, &mut Option<CB::Container>)+'static>(
38        &mut self,
39        name: &str,
40        action: F) -> Option<Box<dyn Any>>
41    {
42        let logger = Logger::<CB>::new(self.time, Duration::default(), action);
43        self.insert_logger(name, logger)
44    }
45
46    /// Binds a log name to a logger.
47    pub fn insert_logger<CB: ContainerBuilder>(&mut self, name: &str, logger: Logger<CB>) -> Option<Box<dyn Any>> {
48        self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0)
49    }
50
51    /// Removes a bound logger.
52    ///
53    /// This is intended primarily to close a logging stream and let the associated writer
54    /// communicate that the stream is closed to any consumers. If a binding is not removed,
55    /// then the stream cannot be complete as in principle anyone could acquire a handle to
56    /// the logger and start further logging.
57    pub fn remove(&mut self, name: &str) -> Option<Box<dyn Any>> {
58        self.map.remove(name).map(|x| x.0)
59    }
60
61    /// Retrieves a shared logger, if one has been inserted.
62    pub fn get<CB: ContainerBuilder>(&self, name: &str) -> Option<Logger<CB>> {
63        self.map
64            .get(name)
65            .and_then(|entry| entry.0.downcast_ref::<Logger<CB>>())
66            .map(|x| (*x).clone())
67    }
68
69    /// Creates a new logger registry.
70    pub fn new(time: Instant) -> Self {
71        Registry {
72            time,
73            map: HashMap::new(),
74        }
75    }
76
77    /// Flushes all registered logs.
78    pub fn flush(&mut self) {
79        <Self as Flush>::flush(self);
80    }
81}
82
83impl Flush for Registry {
84    fn flush(&self) {
85        for value in self.map.values() {
86            value.1.flush();
87        }
88    }
89}
90
91/// A buffering logger.
92pub struct Logger<CB: ContainerBuilder> {
93    inner: Rc<RefCell<LoggerInner<CB, dyn FnMut(&Duration, &mut Option<CB::Container>)>>>,
94}
95
96impl<CB: ContainerBuilder> Clone for Logger<CB> {
97    fn clone(&self) -> Self {
98        Self {
99            inner: Rc::clone(&self.inner)
100        }
101    }
102}
103
104impl<CB: ContainerBuilder + Debug> Debug for Logger<CB> {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
106        f.debug_struct("Logger")
107            .field("inner", &self.inner)
108            .finish()
109    }
110}
111
112struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> {
113    /// common instant used for all loggers.
114    time:   Instant,
115    /// offset to allow re-calibration.
116    offset: Duration,
117    /// container builder to produce buffers of accumulated log events
118    builder: CB,
119    /// action to take on full log buffers, or on flush.
120    action: A,
121}
122
123impl<CB: ContainerBuilder> Logger<CB> {
124    /// Allocates a new shareable logger bound to a write destination.
125    pub fn new<F>(time: Instant, offset: Duration, action: F) -> Self
126    where
127        F: FnMut(&Duration, &mut Option<CB::Container>)+'static
128    {
129        let inner = LoggerInner {
130            time,
131            offset,
132            action,
133            builder: CB::default(),
134        };
135        let inner = Rc::new(RefCell::new(inner));
136        Logger { inner }
137    }
138
139    /// Logs an event.
140    ///
141    /// The event has its timestamp recorded at the moment of logging, but it may be delayed
142    /// due to buffering. It will be written when the logger is next flushed, either due to
143    /// the buffer reaching capacity or a direct call to flush.
144    ///
145    /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
146    /// that the `action` only sees one stream of events with increasing timestamps. This may
147    /// have a cost that we don't entirely understand.
148    pub fn log<T>(&self, event: T) where CB: PushInto<(Duration, T)> {
149        self.log_many(Some(event));
150    }
151
152    /// Logs multiple events.
153    ///
154    /// The event has its timestamp recorded at the moment of logging, but it may be delayed
155    /// due to buffering. It will be written when the logger is next flushed, either due to
156    /// the buffer reaching capacity or a direct call to flush.
157    ///
158    /// All events in this call will have the same timestamp. This can be more performant due
159    /// to fewer `time.elapsed()` calls, but it also allows some logged events to appear to be
160    /// "transactional", occurring at the same moment.
161    ///
162    /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
163    /// that the `action` only sees one stream of events with increasing timestamps. This may
164    /// have a cost that we don't entirely understand.
165    pub fn log_many<I>(&self, events: I)
166    where I: IntoIterator, CB: PushInto<(Duration, I::Item)>
167    {
168        self.inner.borrow_mut().log_many(events)
169    }
170
171    /// Flushes logged messages and communicates the new minimal timestamp.
172    pub fn flush(&self) {
173        <Self as Flush>::flush(self);
174    }
175
176    /// Obtain a typed logger.
177    pub fn into_typed<T>(self) -> TypedLogger<CB, T> {
178        self.into()
179    }
180}
181
182/// A logger that's typed to specific events. Its `log` functions accept events that can be
183/// converted into `T`. Dereferencing a `TypedLogger` gives you a [`Logger`] that can log any
184/// compatible type.
185///
186/// Construct a `TypedLogger` with [`Logger::into_typed`] or by calling `into` on a `Logger`.
187#[derive(Debug)]
188pub struct TypedLogger<CB: ContainerBuilder, T> {
189    inner: Logger<CB>,
190    _marker: PhantomData<T>,
191}
192
193impl<CB: ContainerBuilder, T> TypedLogger<CB, T> {
194    /// Logs an event. Equivalent to [`Logger::log`], with the exception that it converts the
195    /// event to `T` before logging.
196    pub fn log<S: Into<T>>(&self, event: S)
197    where
198        CB: PushInto<(Duration, T)>,
199    {
200        self.inner.log(event.into());
201    }
202
203    /// Logs multiple events. Equivalent to [`Logger::log_many`], with the exception that it
204    /// converts the events to `T` before logging.
205    pub fn log_many<I>(&self, events: I)
206    where
207        I: IntoIterator, I::Item: Into<T>,
208        CB: PushInto<(Duration, T)>,
209    {
210        self.inner.log_many(events.into_iter().map(Into::into));
211    }
212}
213
214impl<CB: ContainerBuilder, T> Clone for TypedLogger<CB, T> {
215    fn clone(&self) -> Self {
216        Self {
217            inner: self.inner.clone(),
218            _marker: PhantomData,
219        }
220    }
221}
222
223impl<CB: ContainerBuilder, T> From<Logger<CB>> for TypedLogger<CB, T> {
224    fn from(inner: Logger<CB>) -> Self {
225        TypedLogger {
226            inner,
227            _marker: PhantomData,
228        }
229    }
230}
231
232impl<CB: ContainerBuilder, T> std::ops::Deref for TypedLogger<CB, T> {
233    type Target = Logger<CB>;
234    fn deref(&self) -> &Self::Target {
235        &self.inner
236    }
237}
238
239impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> LoggerInner<CB, A> {
240    /// Push a container with a time at an action.
241    #[inline]
242    fn push(action: &mut A, time: &Duration, container: &mut CB::Container) {
243        let mut c = Some(std::mem::take(container));
244        action(time, &mut c);
245        if let Some(c) = c {
246            *container = c;
247        }
248    }
249
250    fn log_many<I>(&mut self, events: I)
251        where I: IntoIterator, CB: PushInto<(Duration, I::Item)>,
252    {
253        let elapsed = self.time.elapsed() + self.offset;
254        for event in events {
255            self.builder.push_into((elapsed, event));
256            while let Some(container) = self.builder.extract() {
257                Self::push(&mut self.action, &elapsed, container);
258            }
259        }
260    }
261
262    fn flush(&mut self) {
263        let elapsed = self.time.elapsed() + self.offset;
264
265        while let Some(container) = self.builder.finish() {
266            Self::push(&mut self.action, &elapsed, container);
267        }
268
269        // Send no container to indicate flush.
270        (self.action)(&elapsed, &mut None);
271    }
272}
273
274/// Flush on the *last* drop of a logger.
275impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Drop for LoggerInner<CB, A> {
276    fn drop(&mut self) {
277        self.flush();
278    }
279}
280
281impl<CB, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Debug for LoggerInner<CB, A>
282where
283    CB: ContainerBuilder + Debug,
284{
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        f.debug_struct("LoggerInner")
287            .field("time", &self.time)
288            .field("offset", &self.offset)
289            .field("action", &"FnMut")
290            .field("builder", &self.builder)
291            .finish()
292    }
293}
294
295/// Types that can be flushed.
296trait Flush {
297    /// Flushes buffered data.
298    fn flush(&self);
299}
300
301impl<CB: ContainerBuilder> Flush for Logger<CB> {
302    fn flush(&self) {
303        self.inner.borrow_mut().flush()
304    }
305}