rdkafka/
message.rs

1//! Store and manipulate Kafka messages.
2
3use std::ffi::{CStr, CString};
4use std::fmt;
5use std::marker::PhantomData;
6use std::os::raw::c_void;
7use std::ptr;
8use std::str;
9use std::time::SystemTime;
10
11use rdkafka_sys as rdsys;
12use rdkafka_sys::types::*;
13
14use crate::error::{IsError, KafkaError, KafkaResult};
15use crate::util::{self, millis_to_epoch, KafkaDrop, NativePtr};
16
17/// Timestamp of a Kafka message.
18#[derive(Debug, PartialEq, Eq, Clone, Copy)]
19pub enum Timestamp {
20    /// Timestamp not available.
21    NotAvailable,
22    /// Message creation time.
23    CreateTime(i64),
24    /// Log append time.
25    LogAppendTime(i64),
26}
27
28impl Timestamp {
29    /// Convert the timestamp to milliseconds since epoch.
30    pub fn to_millis(self) -> Option<i64> {
31        match self {
32            Timestamp::NotAvailable | Timestamp::CreateTime(-1) | Timestamp::LogAppendTime(-1) => {
33                None
34            }
35            Timestamp::CreateTime(t) | Timestamp::LogAppendTime(t) => Some(t),
36        }
37    }
38
39    /// Creates a new `Timestamp::CreateTime` representing the current time.
40    pub fn now() -> Timestamp {
41        Timestamp::from(SystemTime::now())
42    }
43}
44
45impl From<i64> for Timestamp {
46    fn from(system_time: i64) -> Timestamp {
47        Timestamp::CreateTime(system_time)
48    }
49}
50
51impl From<SystemTime> for Timestamp {
52    fn from(system_time: SystemTime) -> Timestamp {
53        Timestamp::CreateTime(millis_to_epoch(system_time))
54    }
55}
56
57// Use TryFrom when stable
58//impl From<Timestamp> for i64 {
59//    fn from(timestamp: Timestamp) -> i64 {
60//        timestamp.to_millis().unwrap()
61//    }
62//}
63
64/// A generic representation of Kafka message headers.
65///
66/// This trait represents readable message headers. Headers are key-value pairs
67/// that can be sent alongside every message. Only read-only methods are
68/// provided by this trait, as the underlying storage might not allow
69/// modification.
70pub trait Headers {
71    /// Returns the number of contained headers.
72    fn count(&self) -> usize;
73
74    /// Gets the specified header, where the first header corresponds to
75    /// index 0.
76    ///
77    /// Panics if the index is out of bounds.
78    fn get(&self, idx: usize) -> Header<'_, &[u8]> {
79        self.try_get(idx).unwrap_or_else(|| {
80            panic!(
81                "headers index out of bounds: the count is {} but the index is {}",
82                self.count(),
83                idx,
84            )
85        })
86    }
87
88    /// Like [`Headers::get`], but the value of the header will be converted
89    /// to the specified type.
90    ///
91    /// Panics if the index is out of bounds.
92    fn get_as<V>(&self, idx: usize) -> Result<Header<'_, &V>, V::Error>
93    where
94        V: FromBytes + ?Sized,
95    {
96        self.try_get_as(idx).unwrap_or_else(|| {
97            panic!(
98                "headers index out of bounds: the count is {} but the index is {}",
99                self.count(),
100                idx,
101            )
102        })
103    }
104
105    /// Like [`Headers::get`], but returns an option if the header is out of
106    /// bounds rather than panicking.
107    fn try_get(&self, idx: usize) -> Option<Header<'_, &[u8]>>;
108
109    /// Like [`Headers::get`], but returns an option if the header is out of
110    /// bounds rather than panicking.
111    fn try_get_as<V>(&self, idx: usize) -> Option<Result<Header<'_, &V>, V::Error>>
112    where
113        V: FromBytes + ?Sized,
114    {
115        self.try_get(idx).map(|header| header.parse())
116    }
117
118    /// Iterates over all headers in order.
119    fn iter(&self) -> HeadersIter<'_, Self>
120    where
121        Self: Sized,
122    {
123        HeadersIter {
124            headers: self,
125            index: 0,
126        }
127    }
128}
129
130/// A Kafka message header.
131#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd)]
132pub struct Header<'a, V> {
133    /// The header's key.
134    pub key: &'a str,
135    /// The header's value.
136    pub value: Option<V>,
137}
138
139impl<'a> Header<'a, &'a [u8]> {
140    fn parse<V>(&self) -> Result<Header<'a, &'a V>, V::Error>
141    where
142        V: FromBytes + ?Sized,
143    {
144        Ok(Header {
145            key: self.key,
146            value: self.value.map(V::from_bytes).transpose()?,
147        })
148    }
149}
150
151/// An iterator over [`Headers`].
152pub struct HeadersIter<'a, H> {
153    headers: &'a H,
154    index: usize,
155}
156
157impl<'a, H> Iterator for HeadersIter<'a, H>
158where
159    H: Headers,
160{
161    type Item = Header<'a, &'a [u8]>;
162
163    fn next(&mut self) -> Option<Header<'a, &'a [u8]>> {
164        if self.index < self.headers.count() {
165            let item = self.headers.get(self.index);
166            self.index += 1;
167            Some(item)
168        } else {
169            None
170        }
171    }
172}
173
174/// A generic representation of a Kafka message.
175///
176/// Only read-only methods are provided by this trait, as the underlying storage
177/// might not allow modification.
178pub trait Message {
179    /// The type of headers that this message contains.
180    type Headers: Headers;
181
182    /// Returns the key of the message, or `None` if there is no key.
183    fn key(&self) -> Option<&[u8]>;
184
185    /// Returns the payload of the message, or `None` if there is no payload.
186    fn payload(&self) -> Option<&[u8]>;
187
188    /// Returns a mutable reference to the payload of the message, or `None` if
189    /// there is no payload.
190    ///
191    ///
192    /// # Safety
193    ///
194    /// librdkafka does not formally guarantee that modifying the payload is
195    /// safe. Calling this method may therefore result in undefined behavior.
196    unsafe fn payload_mut(&mut self) -> Option<&mut [u8]>;
197
198    /// Returns the source topic of the message.
199    fn topic(&self) -> &str;
200
201    /// Returns the partition number where the message is stored.
202    fn partition(&self) -> i32;
203
204    /// Returns the offset of the message within the partition.
205    fn offset(&self) -> i64;
206
207    /// Returns the message timestamp.
208    fn timestamp(&self) -> Timestamp;
209
210    /// Converts the raw bytes of the payload to a reference of the specified
211    /// type, that points to the same data inside the message and without
212    /// performing any memory allocation.
213    fn payload_view<P: ?Sized + FromBytes>(&self) -> Option<Result<&P, P::Error>> {
214        self.payload().map(P::from_bytes)
215    }
216
217    /// Converts the raw bytes of the key to a reference of the specified type,
218    /// that points to the same data inside the message and without performing
219    /// any memory allocation.
220    fn key_view<K: ?Sized + FromBytes>(&self) -> Option<Result<&K, K::Error>> {
221        self.key().map(K::from_bytes)
222    }
223
224    /// Returns the headers of the message, or `None` if there are no headers.
225    fn headers(&self) -> Option<&Self::Headers>;
226}
227
228/// A zero-copy collection of Kafka message headers.
229///
230/// Provides a read-only access to headers owned by a Kafka consumer or producer
231/// or by an [`OwnedHeaders`] struct.
232pub struct BorrowedHeaders;
233
234impl BorrowedHeaders {
235    unsafe fn from_native_ptr<T>(
236        _owner: &T,
237        headers_ptr: *mut rdsys::rd_kafka_headers_t,
238    ) -> &BorrowedHeaders {
239        &*(headers_ptr as *mut BorrowedHeaders)
240    }
241
242    fn as_native_ptr(&self) -> *const RDKafkaHeaders {
243        self as *const BorrowedHeaders as *const RDKafkaHeaders
244    }
245
246    /// Clones the content of `BorrowedHeaders` and returns an [`OwnedHeaders`]
247    /// that can outlive the consumer.
248    ///
249    /// This operation requires memory allocation and can be expensive.
250    pub fn detach(&self) -> OwnedHeaders {
251        OwnedHeaders {
252            ptr: unsafe {
253                NativePtr::from_ptr(rdsys::rd_kafka_headers_copy(self.as_native_ptr())).unwrap()
254            },
255        }
256    }
257}
258
259impl Headers for BorrowedHeaders {
260    fn count(&self) -> usize {
261        unsafe { rdsys::rd_kafka_header_cnt(self.as_native_ptr()) }
262    }
263
264    fn try_get(&self, idx: usize) -> Option<Header<'_, &[u8]>> {
265        let mut value_ptr = ptr::null();
266        let mut name_ptr = ptr::null();
267        let mut value_size = 0;
268        let err = unsafe {
269            rdsys::rd_kafka_header_get_all(
270                self.as_native_ptr(),
271                idx,
272                &mut name_ptr,
273                &mut value_ptr,
274                &mut value_size,
275            )
276        };
277        if err.is_error() {
278            None
279        } else {
280            unsafe {
281                Some(Header {
282                    key: CStr::from_ptr(name_ptr).to_str().unwrap(),
283                    value: (!value_ptr.is_null())
284                        .then(|| util::ptr_to_slice(value_ptr, value_size)),
285                })
286            }
287        }
288    }
289}
290
291/// A zero-copy Kafka message.
292///
293/// Provides a read-only access to headers owned by a Kafka consumer or producer
294/// or by an [`OwnedMessage`] struct.
295///
296/// ## Consumers
297///
298/// `BorrowedMessage`s coming from consumers are removed from the consumer
299/// buffer once they are dropped. Holding references to too many messages will
300/// cause the memory of the consumer to fill up and the consumer to block until
301/// some of the `BorrowedMessage`s are dropped.
302///
303/// ## Conversion to owned
304///
305/// To transform a `BorrowedMessage` into a [`OwnedMessage`], use the
306/// [`detach`](BorrowedMessage::detach) method.
307pub struct BorrowedMessage<'a> {
308    ptr: NativePtr<RDKafkaMessage>,
309    _owner: PhantomData<&'a u8>,
310}
311
312unsafe impl KafkaDrop for RDKafkaMessage {
313    const TYPE: &'static str = "message";
314    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_message_destroy;
315}
316
317impl<'a> fmt::Debug for BorrowedMessage<'a> {
318    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
319        write!(f, "Message {{ ptr: {:?} }}", self.ptr())
320    }
321}
322
323impl<'a> BorrowedMessage<'a> {
324    /// Creates a new `BorrowedMessage` that wraps the native Kafka message
325    /// pointer returned by a consumer. The lifetime of the message will be
326    /// bound to the lifetime of the consumer passed as parameter. This method
327    /// should only be used with messages coming from consumers. If the message
328    /// contains an error, only the error is returned and the message structure
329    /// is freed.
330    pub(crate) unsafe fn from_consumer<C>(
331        ptr: NativePtr<RDKafkaMessage>,
332        _consumer: &'a C,
333    ) -> KafkaResult<BorrowedMessage<'a>> {
334        if ptr.err.is_error() {
335            let err = match ptr.err {
336                rdsys::rd_kafka_resp_err_t::RD_KAFKA_RESP_ERR__PARTITION_EOF => {
337                    KafkaError::PartitionEOF((*ptr).partition)
338                }
339                e => KafkaError::MessageConsumption(e.into()),
340            };
341            Err(err)
342        } else {
343            Ok(BorrowedMessage {
344                ptr,
345                _owner: PhantomData,
346            })
347        }
348    }
349
350    /// Creates a new `BorrowedMessage` that wraps the native Kafka message
351    /// pointer returned by the delivery callback of a producer. The lifetime of
352    /// the message will be bound to the lifetime of the reference passed as
353    /// parameter. This method should only be used with messages coming from the
354    /// delivery callback. The message will not be freed in any circumstance.
355    pub(crate) unsafe fn from_dr_callback<O>(
356        ptr: *mut RDKafkaMessage,
357        _owner: &'a O,
358    ) -> DeliveryResult<'a> {
359        let borrowed_message = BorrowedMessage {
360            ptr: NativePtr::from_ptr(ptr).unwrap(),
361            _owner: PhantomData,
362        };
363        if (*ptr).err.is_error() {
364            Err((
365                KafkaError::MessageProduction((*ptr).err.into()),
366                borrowed_message,
367            ))
368        } else {
369            Ok(borrowed_message)
370        }
371    }
372
373    /// Returns a pointer to the [`RDKafkaMessage`].
374    pub fn ptr(&self) -> *mut RDKafkaMessage {
375        self.ptr.ptr()
376    }
377
378    /// Returns a pointer to the message's [`RDKafkaTopic`]
379    pub fn topic_ptr(&self) -> *mut RDKafkaTopic {
380        self.ptr.rkt
381    }
382
383    /// Returns the length of the key field of the message.
384    pub fn key_len(&self) -> usize {
385        self.ptr.key_len
386    }
387
388    /// Returns the length of the payload field of the message.
389    pub fn payload_len(&self) -> usize {
390        self.ptr.len
391    }
392
393    /// Clones the content of the `BorrowedMessage` and returns an
394    /// [`OwnedMessage`] that can outlive the consumer.
395    ///
396    /// This operation requires memory allocation and can be expensive.
397    pub fn detach(&self) -> OwnedMessage {
398        OwnedMessage {
399            key: self.key().map(|k| k.to_vec()),
400            payload: self.payload().map(|p| p.to_vec()),
401            topic: self.topic().to_owned(),
402            timestamp: self.timestamp(),
403            partition: self.partition(),
404            offset: self.offset(),
405            headers: self.headers().map(BorrowedHeaders::detach),
406        }
407    }
408}
409
410impl<'a> Message for BorrowedMessage<'a> {
411    type Headers = BorrowedHeaders;
412
413    fn key(&self) -> Option<&[u8]> {
414        unsafe { util::ptr_to_opt_slice((*self.ptr).key, (*self.ptr).key_len) }
415    }
416
417    fn payload(&self) -> Option<&[u8]> {
418        unsafe { util::ptr_to_opt_slice((*self.ptr).payload, (*self.ptr).len) }
419    }
420
421    unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
422        util::ptr_to_opt_mut_slice((*self.ptr).payload, (*self.ptr).len)
423    }
424
425    fn topic(&self) -> &str {
426        unsafe {
427            CStr::from_ptr(rdsys::rd_kafka_topic_name((*self.ptr).rkt))
428                .to_str()
429                .expect("Topic name is not valid UTF-8")
430        }
431    }
432
433    fn partition(&self) -> i32 {
434        self.ptr.partition
435    }
436
437    fn offset(&self) -> i64 {
438        self.ptr.offset
439    }
440
441    fn timestamp(&self) -> Timestamp {
442        let mut timestamp_type = rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE;
443        let timestamp =
444            unsafe { rdsys::rd_kafka_message_timestamp(self.ptr.ptr(), &mut timestamp_type) };
445        if timestamp == -1 {
446            Timestamp::NotAvailable
447        } else {
448            match timestamp_type {
449                rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_NOT_AVAILABLE => {
450                    Timestamp::NotAvailable
451                }
452                rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_CREATE_TIME => {
453                    Timestamp::CreateTime(timestamp)
454                }
455                rdsys::rd_kafka_timestamp_type_t::RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME => {
456                    Timestamp::LogAppendTime(timestamp)
457                }
458            }
459        }
460    }
461
462    fn headers(&self) -> Option<&BorrowedHeaders> {
463        let mut native_headers_ptr = ptr::null_mut();
464        unsafe {
465            let err = rdsys::rd_kafka_message_headers(self.ptr.ptr(), &mut native_headers_ptr);
466            match err.into() {
467                RDKafkaErrorCode::NoError => {
468                    Some(BorrowedHeaders::from_native_ptr(self, native_headers_ptr))
469                }
470                RDKafkaErrorCode::NoEnt => None,
471                _ => None,
472            }
473        }
474    }
475}
476
477unsafe impl<'a> Send for BorrowedMessage<'a> {}
478unsafe impl<'a> Sync for BorrowedMessage<'a> {}
479
480//
481// ********** OWNED MESSAGE **********
482//
483
484/// A collection of Kafka message headers that owns its backing data.
485///
486/// Kafka supports associating an array of key-value pairs to every message,
487/// called message headers. The `OwnedHeaders` can be used to create the desired
488/// headers and to pass them to the producer. See also [`BorrowedHeaders`].
489#[derive(Debug)]
490pub struct OwnedHeaders {
491    ptr: NativePtr<RDKafkaHeaders>,
492}
493
494unsafe impl KafkaDrop for RDKafkaHeaders {
495    const TYPE: &'static str = "headers";
496    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_headers_destroy;
497}
498
499unsafe impl Send for OwnedHeaders {}
500unsafe impl Sync for OwnedHeaders {}
501
502impl OwnedHeaders {
503    /// Creates a new `OwnedHeaders` struct with initial capacity 5.
504    pub fn new() -> OwnedHeaders {
505        OwnedHeaders::new_with_capacity(5)
506    }
507
508    /// Creates a new `OwnedHeaders` struct with the desired initial capacity.
509    /// The structure is automatically resized as more headers are added.
510    pub fn new_with_capacity(initial_capacity: usize) -> OwnedHeaders {
511        OwnedHeaders {
512            ptr: unsafe {
513                NativePtr::from_ptr(rdsys::rd_kafka_headers_new(initial_capacity)).unwrap()
514            },
515        }
516    }
517
518    /// Inserts a new header.
519    pub fn insert<V>(self, header: Header<'_, &V>) -> OwnedHeaders
520    where
521        V: ToBytes + ?Sized,
522    {
523        let name_cstring = CString::new(header.key.to_owned()).unwrap();
524        let (value_ptr, value_len) = match header.value {
525            None => (ptr::null_mut(), 0),
526            Some(value) => {
527                let value_bytes = value.to_bytes();
528                (
529                    value_bytes.as_ptr() as *mut c_void,
530                    value_bytes.len() as isize,
531                )
532            }
533        };
534        let err = unsafe {
535            rdsys::rd_kafka_header_add(
536                self.ptr(),
537                name_cstring.as_ptr(),
538                name_cstring.as_bytes().len() as isize,
539                value_ptr,
540                value_len,
541            )
542        };
543        // OwnedHeaders should always represent writable instances of RDKafkaHeaders
544        assert!(!err.is_error());
545        self
546    }
547
548    pub(crate) fn ptr(&self) -> *mut RDKafkaHeaders {
549        self.ptr.ptr()
550    }
551
552    /// Generates a read-only [`BorrowedHeaders`] reference.
553    pub fn as_borrowed(&self) -> &BorrowedHeaders {
554        unsafe { &*(self.ptr() as *mut RDKafkaHeaders as *mut BorrowedHeaders) }
555    }
556}
557
558impl Default for OwnedHeaders {
559    fn default() -> OwnedHeaders {
560        OwnedHeaders::new()
561    }
562}
563
564impl Headers for OwnedHeaders {
565    fn count(&self) -> usize {
566        unsafe { rdsys::rd_kafka_header_cnt(self.ptr()) }
567    }
568
569    fn try_get(&self, idx: usize) -> Option<Header<'_, &[u8]>> {
570        self.as_borrowed().try_get(idx)
571    }
572}
573
574impl Clone for OwnedHeaders {
575    fn clone(&self) -> Self {
576        OwnedHeaders {
577            ptr: unsafe { NativePtr::from_ptr(rdsys::rd_kafka_headers_copy(self.ptr())).unwrap() },
578        }
579    }
580}
581
582/// A Kafka message that owns its backing data.
583///
584/// An `OwnedMessage` can be created from a [`BorrowedMessage`] using the
585/// [`BorrowedMessage::detach`] method. `OwnedMessage`s don't hold any reference
586/// to the consumer and don't use any memory inside the consumer buffer.
587#[derive(Debug, Clone)]
588pub struct OwnedMessage {
589    payload: Option<Vec<u8>>,
590    key: Option<Vec<u8>>,
591    topic: String,
592    timestamp: Timestamp,
593    partition: i32,
594    offset: i64,
595    headers: Option<OwnedHeaders>,
596}
597
598impl OwnedMessage {
599    /// Creates a new message with the specified content.
600    ///
601    /// This function is mainly useful in tests of `rust-rdkafka` itself.
602    pub fn new(
603        payload: Option<Vec<u8>>,
604        key: Option<Vec<u8>>,
605        topic: String,
606        timestamp: Timestamp,
607        partition: i32,
608        offset: i64,
609        headers: Option<OwnedHeaders>,
610    ) -> OwnedMessage {
611        OwnedMessage {
612            payload,
613            key,
614            topic,
615            timestamp,
616            partition,
617            offset,
618            headers,
619        }
620    }
621
622    /// Detaches the [`OwnedHeaders`] from this `OwnedMessage`.
623    pub fn detach_headers(&mut self) -> Option<OwnedHeaders> {
624        self.headers.take()
625    }
626}
627
628impl Message for OwnedMessage {
629    type Headers = OwnedHeaders;
630
631    fn key(&self) -> Option<&[u8]> {
632        match self.key {
633            Some(ref k) => Some(k.as_slice()),
634            None => None,
635        }
636    }
637
638    fn payload(&self) -> Option<&[u8]> {
639        self.payload.as_deref()
640    }
641
642    unsafe fn payload_mut(&mut self) -> Option<&mut [u8]> {
643        self.payload.as_deref_mut()
644    }
645
646    fn topic(&self) -> &str {
647        self.topic.as_ref()
648    }
649
650    fn partition(&self) -> i32 {
651        self.partition
652    }
653
654    fn offset(&self) -> i64 {
655        self.offset
656    }
657
658    fn timestamp(&self) -> Timestamp {
659        self.timestamp
660    }
661
662    fn headers(&self) -> Option<&OwnedHeaders> {
663        self.headers.as_ref()
664    }
665}
666
667/// The result of a message production.
668///
669/// If message production is successful `DeliveryResult` will contain the sent
670/// message, which can be used to find which partition and offset the message
671/// was sent to. If message production is not successful, the `DeliveryResult`
672/// will contain an error and the message that failed to be sent. The partition
673/// and offset, in this case, will default to -1 and 0 respectively.
674///
675/// ## Lifetimes
676///
677/// In both success or failure scenarios, the payload of the message resides in
678/// the buffer of the producer and will be automatically removed once the
679/// `delivery` callback finishes.
680pub type DeliveryResult<'a> = Result<BorrowedMessage<'a>, (KafkaError, BorrowedMessage<'a>)>;
681
682/// A cheap conversion from a byte slice to typed data.
683///
684/// Given a reference to a byte slice, returns a different view of the same
685/// data. No allocation is performed, however the underlying data might be
686/// checked for correctness (for example when converting to `str`).
687///
688/// See also the [`ToBytes`] trait.
689pub trait FromBytes {
690    /// The error type that will be returned if the conversion fails.
691    type Error;
692    /// Tries to convert the provided byte slice into a different type.
693    fn from_bytes(_: &[u8]) -> Result<&Self, Self::Error>;
694}
695
696impl FromBytes for [u8] {
697    type Error = ();
698    fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
699        Ok(bytes)
700    }
701}
702
703impl FromBytes for str {
704    type Error = str::Utf8Error;
705    fn from_bytes(bytes: &[u8]) -> Result<&Self, Self::Error> {
706        str::from_utf8(bytes)
707    }
708}
709
710/// A cheap conversion from typed data to a byte slice.
711///
712/// Given some data, returns the byte representation of that data.
713/// No copy of the data should be performed.
714///
715/// See also the [`FromBytes`] trait.
716pub trait ToBytes {
717    /// Converts the provided data to bytes.
718    fn to_bytes(&self) -> &[u8];
719}
720
721impl ToBytes for [u8] {
722    fn to_bytes(&self) -> &[u8] {
723        self
724    }
725}
726
727impl ToBytes for str {
728    fn to_bytes(&self) -> &[u8] {
729        self.as_bytes()
730    }
731}
732
733impl ToBytes for Vec<u8> {
734    fn to_bytes(&self) -> &[u8] {
735        self.as_slice()
736    }
737}
738
739impl ToBytes for String {
740    fn to_bytes(&self) -> &[u8] {
741        self.as_bytes()
742    }
743}
744
745impl<'a, T: ToBytes> ToBytes for &'a T {
746    fn to_bytes(&self) -> &[u8] {
747        (*self).to_bytes()
748    }
749}
750
751impl ToBytes for () {
752    fn to_bytes(&self) -> &[u8] {
753        &[]
754    }
755}
756
757// Implement to_bytes for arrays - https://github.com/rust-lang/rfcs/issues/1038
758macro_rules! array_impls {
759    ($($N:expr)+) => {
760        $(
761            impl ToBytes for [u8; $N] {
762                fn to_bytes(&self) -> &[u8] { self }
763            }
764         )+
765    }
766}
767
768array_impls! {
769     0  1  2  3  4  5  6  7  8  9
770    10 11 12 13 14 15 16 17 18 19
771    20 21 22 23 24 25 26 27 28 29
772    30 31 32
773}
774
775#[cfg(test)]
776mod test {
777    use super::*;
778    use std::time::SystemTime;
779
780    #[test]
781    fn test_timestamp_creation() {
782        let now = SystemTime::now();
783        let t1 = Timestamp::now();
784        let t2 = Timestamp::from(now);
785        let expected = Timestamp::CreateTime(util::millis_to_epoch(now));
786
787        assert_eq!(t2, expected);
788        assert!(t1.to_millis().unwrap() - t2.to_millis().unwrap() < 10);
789    }
790
791    #[test]
792    fn test_timestamp_conversion() {
793        assert_eq!(Timestamp::CreateTime(100).to_millis(), Some(100));
794        assert_eq!(Timestamp::LogAppendTime(100).to_millis(), Some(100));
795        assert_eq!(Timestamp::CreateTime(-1).to_millis(), None);
796        assert_eq!(Timestamp::LogAppendTime(-1).to_millis(), None);
797        assert_eq!(Timestamp::NotAvailable.to_millis(), None);
798        let t: Timestamp = 100.into();
799        assert_eq!(t, Timestamp::CreateTime(100));
800    }
801
802    #[test]
803    fn test_headers() {
804        let owned = OwnedHeaders::new()
805            .insert(Header {
806                key: "key1",
807                value: Some("value1"),
808            })
809            .insert(Header {
810                key: "key2",
811                value: Some("value2"),
812            });
813        assert_eq!(
814            owned.get(0),
815            Header {
816                key: "key1",
817                value: Some(&[118, 97, 108, 117, 101, 49][..])
818            }
819        );
820        assert_eq!(
821            owned.get_as::<str>(1),
822            Ok(Header {
823                key: "key2",
824                value: Some("value2")
825            })
826        );
827    }
828}