timely_logging/
lib.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305

use std::rc::Rc;
use std::cell::RefCell;
use std::any::Any;
use std::collections::HashMap;
use std::time::{Instant, Duration};
use std::fmt::{self, Debug};
use std::marker::PhantomData;

use timely_container::{ContainerBuilder, PushInto};

/// A registry binding names to typed loggers.
pub struct Registry {
    /// A map from names to typed loggers.
    map: HashMap<String, (Box<dyn Any>, Box<dyn Flush>)>,
    /// An instant common to all logging statements.
    time: Instant,
}

impl Registry {
    /// Binds a log name to an action on log event batches.
    ///
    /// This method also returns any pre-installed action, rather than overwriting it
    /// and pivoting the logging destination mid-stream. New loggers with this name will
    /// use the new destination, and existing loggers will use the old destination.
    ///
    /// The action should respond to a sequence of events with non-decreasing timestamps
    /// (Durations) and well as a timestamp that lower bounds the next event that could be
    /// seen (likely greater or equal to the timestamp of the last event). The end of a
    /// logging stream is indicated only by dropping the associated action, which can be
    /// accomplished with `remove` (or a call to insert, though this is not recommended).
    ///
    /// The action needs to follow the requirements of container builder `CB` regarding what they
    /// need to do with containers they receive and what properties to uphold.
    ///
    /// Passing a `&mut None` container to an action indicates a flush.
    pub fn insert<CB: ContainerBuilder, F: FnMut(&Duration, &mut Option<CB::Container>)+'static>(
        &mut self,
        name: &str,
        action: F) -> Option<Box<dyn Any>>
    {
        let logger = Logger::<CB>::new(self.time, Duration::default(), action);
        self.insert_logger(name, logger)
    }

    /// Binds a log name to a logger.
    pub fn insert_logger<CB: ContainerBuilder>(&mut self, name: &str, logger: Logger<CB>) -> Option<Box<dyn Any>> {
        self.map.insert(name.to_owned(), (Box::new(logger.clone()), Box::new(logger))).map(|x| x.0)
    }

    /// Removes a bound logger.
    ///
    /// This is intended primarily to close a logging stream and let the associated writer
    /// communicate that the stream is closed to any consumers. If a binding is not removed,
    /// then the stream cannot be complete as in principle anyone could acquire a handle to
    /// the logger and start further logging.
    pub fn remove(&mut self, name: &str) -> Option<Box<dyn Any>> {
        self.map.remove(name).map(|x| x.0)
    }

    /// Retrieves a shared logger, if one has been inserted.
    pub fn get<CB: ContainerBuilder>(&self, name: &str) -> Option<Logger<CB>> {
        self.map
            .get(name)
            .and_then(|entry| entry.0.downcast_ref::<Logger<CB>>())
            .map(|x| (*x).clone())
    }

    /// Creates a new logger registry.
    pub fn new(time: Instant) -> Self {
        Registry {
            time,
            map: HashMap::new(),
        }
    }

    /// Flushes all registered logs.
    pub fn flush(&mut self) {
        <Self as Flush>::flush(self);
    }
}

impl Flush for Registry {
    fn flush(&self) {
        for value in self.map.values() {
            value.1.flush();
        }
    }
}

/// A buffering logger.
pub struct Logger<CB: ContainerBuilder> {
    inner: Rc<RefCell<LoggerInner<CB, dyn FnMut(&Duration, &mut Option<CB::Container>)>>>,
}

impl<CB: ContainerBuilder> Clone for Logger<CB> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone()
        }
    }
}

impl<CB: ContainerBuilder + Debug> Debug for Logger<CB> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Logger")
            .field("inner", &self.inner)
            .finish()
    }
}

struct LoggerInner<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> {
    /// common instant used for all loggers.
    time:   Instant,
    /// offset to allow re-calibration.
    offset: Duration,
    /// container builder to produce buffers of accumulated log events
    builder: CB,
    /// action to take on full log buffers, or on flush.
    action: A,
}

impl<CB: ContainerBuilder> Logger<CB> {
    /// Allocates a new shareable logger bound to a write destination.
    pub fn new<F>(time: Instant, offset: Duration, action: F) -> Self
    where
        F: FnMut(&Duration, &mut Option<CB::Container>)+'static
    {
        let inner = LoggerInner {
            time,
            offset,
            action,
            builder: CB::default(),
        };
        let inner = Rc::new(RefCell::new(inner));
        Logger { inner }
    }

    /// Logs an event.
    ///
    /// The event has its timestamp recorded at the moment of logging, but it may be delayed
    /// due to buffering. It will be written when the logger is next flushed, either due to
    /// the buffer reaching capacity or a direct call to flush.
    ///
    /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
    /// that the `action` only sees one stream of events with increasing timestamps. This may
    /// have a cost that we don't entirely understand.
    pub fn log<T>(&self, event: T) where CB: PushInto<(Duration, T)> {
        self.log_many(Some(event));
    }

    /// Logs multiple events.
    ///
    /// The event has its timestamp recorded at the moment of logging, but it may be delayed
    /// due to buffering. It will be written when the logger is next flushed, either due to
    /// the buffer reaching capacity or a direct call to flush.
    ///
    /// All events in this call will have the same timestamp. This can be more performant due
    /// to fewer `time.elapsed()` calls, but it also allows some logged events to appear to be
    /// "transactional", occurring at the same moment.
    ///
    /// This implementation borrows a shared (but thread-local) buffer of log events, to ensure
    /// that the `action` only sees one stream of events with increasing timestamps. This may
    /// have a cost that we don't entirely understand.
    pub fn log_many<I>(&self, events: I)
    where I: IntoIterator, CB: PushInto<(Duration, I::Item)>
    {
        self.inner.borrow_mut().log_many(events)
    }

    /// Flushes logged messages and communicates the new minimal timestamp.
    pub fn flush(&self) {
        <Self as Flush>::flush(self);
    }

    /// Obtain a typed logger.
    pub fn into_typed<T>(self) -> TypedLogger<CB, T> {
        self.into()
    }
}

/// A logger that's typed to specific events. Its `log` functions accept events that can be
/// converted into `T`. Dereferencing a `TypedLogger` gives you a [`Logger`] that can log any
/// compatible type.
///
/// Construct a `TypedLogger` with [`Logger::into_typed`] or by calling `into` on a `Logger`.
#[derive(Debug)]
pub struct TypedLogger<CB: ContainerBuilder, T> {
    inner: Logger<CB>,
    _marker: PhantomData<T>,
}

impl<CB: ContainerBuilder, T> TypedLogger<CB, T> {
    /// Logs an event. Equivalent to [`Logger::log`], with the exception that it converts the
    /// event to `T` before logging.
    pub fn log<S: Into<T>>(&self, event: S)
    where
        CB: PushInto<(Duration, T)>,
    {
        self.inner.log(event.into());
    }

    /// Logs multiple events. Equivalent to [`Logger::log_many`], with the exception that it
    /// converts the events to `T` before logging.
    pub fn log_many<I>(&self, events: I)
    where
        I: IntoIterator, I::Item: Into<T>,
        CB: PushInto<(Duration, T)>,
    {
        self.inner.log_many(events.into_iter().map(Into::into));
    }
}

impl<CB: ContainerBuilder, T> Clone for TypedLogger<CB, T> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            _marker: PhantomData,
        }
    }
}

impl<CB: ContainerBuilder, T> From<Logger<CB>> for TypedLogger<CB, T> {
    fn from(inner: Logger<CB>) -> Self {
        TypedLogger {
            inner,
            _marker: PhantomData,
        }
    }
}

impl<CB: ContainerBuilder, T> std::ops::Deref for TypedLogger<CB, T> {
    type Target = Logger<CB>;
    fn deref(&self) -> &Self::Target {
        &self.inner
    }
}

impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> LoggerInner<CB, A> {
    /// Push a container with a time at an action.
    #[inline]
    fn push(action: &mut A, time: &Duration, container: &mut CB::Container) {
        let mut c = Some(std::mem::take(container));
        (action)(time, &mut c);
        if let Some(c) = c {
            *container = c;
        }
    }

    fn log_many<I>(&mut self, events: I)
        where I: IntoIterator, CB: PushInto<(Duration, I::Item)>,
    {
        let elapsed = self.time.elapsed() + self.offset;
        for event in events {
            self.builder.push_into((elapsed, event.into()));
            while let Some(container) = self.builder.extract() {
                Self::push(&mut self.action, &elapsed, container);
            }
        }
    }

    fn flush(&mut self) {
        let elapsed = self.time.elapsed() + self.offset;

        while let Some(container) = self.builder.finish() {
            Self::push(&mut self.action, &elapsed, container);
        }

        // Send no container to indicate flush.
        (self.action)(&elapsed, &mut None);
    }
}

/// Flush on the *last* drop of a logger.
impl<CB: ContainerBuilder, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Drop for LoggerInner<CB, A> {
    fn drop(&mut self) {
        self.flush();
    }
}

impl<CB, A: ?Sized + FnMut(&Duration, &mut Option<CB::Container>)> Debug for LoggerInner<CB, A>
where
    CB: ContainerBuilder + Debug,
{
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("LoggerInner")
            .field("time", &self.time)
            .field("offset", &self.offset)
            .field("action", &"FnMut")
            .field("builder", &self.builder)
            .finish()
    }
}

/// Types that can be flushed.
trait Flush {
    /// Flushes buffered data.
    fn flush(&self);
}

impl<CB: ContainerBuilder> Flush for Logger<CB> {
    fn flush(&self) {
        self.inner.borrow_mut().flush()
    }
}