rdkafka/
topic_partition_list.rs

1//! Data structures representing topic, partitions and offsets.
2//!
3//! Compatible with the `RDKafkaTopicPartitionList` exported by `rdkafka-sys`.
4
5use std::collections::HashMap;
6use std::ffi::{CStr, CString};
7use std::fmt;
8use std::slice;
9use std::str;
10
11use libc::c_void;
12use rdkafka_sys as rdsys;
13use rdkafka_sys::types::*;
14
15use crate::error::{IsError, KafkaError, KafkaResult};
16use crate::util::{self, KafkaDrop, NativePtr};
17
18const PARTITION_UNASSIGNED: i32 = -1;
19
20const OFFSET_BEGINNING: i64 = rdsys::RD_KAFKA_OFFSET_BEGINNING as i64;
21const OFFSET_END: i64 = rdsys::RD_KAFKA_OFFSET_END as i64;
22const OFFSET_STORED: i64 = rdsys::RD_KAFKA_OFFSET_STORED as i64;
23const OFFSET_INVALID: i64 = rdsys::RD_KAFKA_OFFSET_INVALID as i64;
24const OFFSET_TAIL_BASE: i64 = rdsys::RD_KAFKA_OFFSET_TAIL_BASE as i64;
25
26/// A Kafka offset.
27#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Offset {
29    /// Start consuming from the beginning of the partition.
30    Beginning,
31    /// Start consuming from the end of the partition.
32    End,
33    /// Start consuming from the stored offset.
34    Stored,
35    /// Offset not assigned or invalid.
36    Invalid,
37    /// A specific offset to consume from.
38    ///
39    /// Note that while the offset is a signed integer, negative offsets will be
40    /// rejected when passed to librdkafka.
41    Offset(i64),
42    /// An offset relative to the end of the partition.
43    ///
44    /// Note that while the offset is a signed integer, negative offsets will
45    /// be rejected when passed to librdkafka.
46    OffsetTail(i64),
47}
48
49impl Offset {
50    /// Converts the integer representation of an offset used by librdkafka to
51    /// an `Offset`.
52    pub fn from_raw(raw_offset: i64) -> Offset {
53        match raw_offset {
54            OFFSET_BEGINNING => Offset::Beginning,
55            OFFSET_END => Offset::End,
56            OFFSET_STORED => Offset::Stored,
57            OFFSET_INVALID => Offset::Invalid,
58            n if n <= OFFSET_TAIL_BASE => Offset::OffsetTail(-(n - OFFSET_TAIL_BASE)),
59            n => Offset::Offset(n),
60        }
61    }
62
63    /// Converts the `Offset` to the internal integer representation used by
64    /// librdkafka.
65    ///
66    /// Returns `None` if the offset cannot be represented in librdkafka's
67    /// internal representation.
68    pub fn to_raw(self) -> Option<i64> {
69        match self {
70            Offset::Beginning => Some(OFFSET_BEGINNING),
71            Offset::End => Some(OFFSET_END),
72            Offset::Stored => Some(OFFSET_STORED),
73            Offset::Invalid => Some(OFFSET_INVALID),
74            Offset::Offset(n) if n >= 0 => Some(n),
75            Offset::OffsetTail(n) if n > 0 => Some(OFFSET_TAIL_BASE - n),
76            Offset::Offset(_) | Offset::OffsetTail(_) => None,
77        }
78    }
79}
80
81// TODO: implement Debug
82/// One element of the topic partition list.
83pub struct TopicPartitionListElem<'a> {
84    ptr: &'a mut RDKafkaTopicPartition,
85}
86
87impl<'a> TopicPartitionListElem<'a> {
88    // _owner_list serves as a marker so that the lifetime isn't too long
89    fn from_ptr(
90        _owner_list: &'a TopicPartitionList,
91        ptr: &'a mut RDKafkaTopicPartition,
92    ) -> TopicPartitionListElem<'a> {
93        TopicPartitionListElem { ptr }
94    }
95
96    /// Returns the topic name.
97    pub fn topic(&self) -> &str {
98        unsafe {
99            let c_str = self.ptr.topic;
100            CStr::from_ptr(c_str)
101                .to_str()
102                .expect("Topic name is not UTF-8")
103        }
104    }
105
106    /// Returns the optional error associated to the specific entry in the TPL.
107    pub fn error(&self) -> KafkaResult<()> {
108        let kafka_err = self.ptr.err;
109        if kafka_err.is_error() {
110            Err(KafkaError::OffsetFetch(kafka_err.into()))
111        } else {
112            Ok(())
113        }
114    }
115
116    /// Returns the partition number.
117    pub fn partition(&self) -> i32 {
118        self.ptr.partition
119    }
120
121    /// Returns the offset.
122    pub fn offset(&self) -> Offset {
123        let raw_offset = self.ptr.offset;
124        Offset::from_raw(raw_offset)
125    }
126
127    /// Sets the offset.
128    pub fn set_offset(&mut self, offset: Offset) -> KafkaResult<()> {
129        match offset.to_raw() {
130            Some(offset) => {
131                self.ptr.offset = offset;
132                Ok(())
133            }
134            None => Err(KafkaError::SetPartitionOffset(
135                RDKafkaErrorCode::InvalidArgument,
136            )),
137        }
138    }
139
140    /// Returns the optional metadata associated with the entry.
141    pub fn metadata(&self) -> &str {
142        let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) };
143        str::from_utf8(bytes).expect("Metadata is not UTF-8")
144    }
145
146    /// Sets the optional metadata associated with the entry.
147    pub fn set_metadata<M>(&mut self, metadata: M)
148    where
149        M: AsRef<str>,
150    {
151        let metadata = metadata.as_ref();
152        let buf = unsafe { libc::malloc(metadata.len()) };
153        unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) };
154        self.ptr.metadata = buf;
155        self.ptr.metadata_size = metadata.len();
156    }
157}
158
159impl<'a> PartialEq for TopicPartitionListElem<'a> {
160    fn eq(&self, other: &TopicPartitionListElem<'a>) -> bool {
161        self.topic() == other.topic()
162            && self.partition() == other.partition()
163            && self.offset() == other.offset()
164            && self.metadata() == other.metadata()
165    }
166}
167
168/// A structure to store and manipulate a list of topics and partitions with optional offsets.
169pub struct TopicPartitionList {
170    ptr: NativePtr<RDKafkaTopicPartitionList>,
171}
172
173unsafe impl KafkaDrop for RDKafkaTopicPartitionList {
174    const TYPE: &'static str = "topic partition list";
175    const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_partition_list_destroy;
176}
177
178impl Clone for TopicPartitionList {
179    fn clone(&self) -> Self {
180        let new_tpl = unsafe { rdsys::rd_kafka_topic_partition_list_copy(self.ptr()) };
181        unsafe { TopicPartitionList::from_ptr(new_tpl) }
182    }
183}
184
185impl TopicPartitionList {
186    /// Creates a new empty list with default capacity.
187    pub fn new() -> TopicPartitionList {
188        TopicPartitionList::with_capacity(5)
189    }
190
191    /// Creates a new empty list with the specified capacity.
192    pub fn with_capacity(capacity: usize) -> TopicPartitionList {
193        let ptr = unsafe { rdsys::rd_kafka_topic_partition_list_new(capacity as i32) };
194        unsafe { TopicPartitionList::from_ptr(ptr) }
195    }
196
197    /// Transforms a pointer to the native librdkafka RDTopicPartitionList into a
198    /// managed `TopicPartitionList` instance.
199    pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaTopicPartitionList) -> TopicPartitionList {
200        TopicPartitionList {
201            ptr: NativePtr::from_ptr(ptr).unwrap(),
202        }
203    }
204
205    /// Given a topic map, generates a new `TopicPartitionList`.
206    pub fn from_topic_map(
207        topic_map: &HashMap<(String, i32), Offset>,
208    ) -> KafkaResult<TopicPartitionList> {
209        let mut tpl = TopicPartitionList::with_capacity(topic_map.len());
210        for ((topic_name, partition), offset) in topic_map {
211            tpl.add_partition_offset(topic_name, *partition, *offset)?;
212        }
213        Ok(tpl)
214    }
215
216    /// Returns the pointer to the internal librdkafka structure.
217    pub fn ptr(&self) -> *mut RDKafkaTopicPartitionList {
218        self.ptr.ptr()
219    }
220
221    /// Returns the number of elements in the list.
222    pub fn count(&self) -> usize {
223        self.ptr.cnt as usize
224    }
225
226    /// Returns the capacity of the list.
227    pub fn capacity(&self) -> usize {
228        self.ptr.size as usize
229    }
230
231    /// Adds a topic with unassigned partitions to the list.
232    pub fn add_topic_unassigned<'a>(&'a mut self, topic: &str) -> TopicPartitionListElem<'a> {
233        self.add_partition(topic, PARTITION_UNASSIGNED)
234    }
235
236    /// Adds a topic and partition to the list.
237    pub fn add_partition<'a>(
238        &'a mut self,
239        topic: &str,
240        partition: i32,
241    ) -> TopicPartitionListElem<'a> {
242        let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
243        let tp_ptr = unsafe {
244            rdsys::rd_kafka_topic_partition_list_add(self.ptr(), topic_c.as_ptr(), partition)
245        };
246        unsafe { TopicPartitionListElem::from_ptr(self, &mut *tp_ptr) }
247    }
248
249    /// Adds a topic and partition range to the list.
250    pub fn add_partition_range(&mut self, topic: &str, start_partition: i32, stop_partition: i32) {
251        let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
252        unsafe {
253            rdsys::rd_kafka_topic_partition_list_add_range(
254                self.ptr(),
255                topic_c.as_ptr(),
256                start_partition,
257                stop_partition,
258            );
259        }
260    }
261
262    /// Sets the offset for an already created topic partition. It will fail if the topic partition
263    /// isn't in the list.
264    pub fn set_partition_offset(
265        &mut self,
266        topic: &str,
267        partition: i32,
268        offset: Offset,
269    ) -> KafkaResult<()> {
270        let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
271        let kafka_err = match offset.to_raw() {
272            Some(offset) => unsafe {
273                rdsys::rd_kafka_topic_partition_list_set_offset(
274                    self.ptr(),
275                    topic_c.as_ptr(),
276                    partition,
277                    offset,
278                )
279            },
280            None => RDKafkaRespErr::RD_KAFKA_RESP_ERR__INVALID_ARG,
281        };
282
283        if kafka_err.is_error() {
284            Err(KafkaError::SetPartitionOffset(kafka_err.into()))
285        } else {
286            Ok(())
287        }
288    }
289
290    /// Adds a topic and partition to the list, with the specified offset.
291    pub fn add_partition_offset(
292        &mut self,
293        topic: &str,
294        partition: i32,
295        offset: Offset,
296    ) -> KafkaResult<()> {
297        self.add_partition(topic, partition);
298        self.set_partition_offset(topic, partition, offset)
299    }
300
301    /// Given a topic name and a partition number, returns the corresponding list element.
302    pub fn find_partition(
303        &self,
304        topic: &str,
305        partition: i32,
306    ) -> Option<TopicPartitionListElem<'_>> {
307        let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
308        let elem_ptr = unsafe {
309            rdsys::rd_kafka_topic_partition_list_find(self.ptr(), topic_c.as_ptr(), partition)
310        };
311        if elem_ptr.is_null() {
312            None
313        } else {
314            Some(unsafe { TopicPartitionListElem::from_ptr(self, &mut *elem_ptr) })
315        }
316    }
317
318    /// Sets all partitions in the list to the specified offset.
319    pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
320        let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
321        for elem_ptr in slice {
322            let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
323            elem.set_offset(offset)?;
324        }
325        Ok(())
326    }
327
328    /// Returns all the elements of the list.
329    pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
330        let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
331        let mut vec = Vec::with_capacity(slice.len());
332        for elem_ptr in slice {
333            vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
334        }
335        vec
336    }
337
338    /// Returns all the elements of the list that belong to the specified topic.
339    pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
340        let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
341        let mut vec = Vec::with_capacity(slice.len());
342        for elem_ptr in slice {
343            let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
344            if tp.topic() == topic {
345                vec.push(tp);
346            }
347        }
348        vec
349    }
350
351    /// Returns a hashmap-based representation of the list.
352    pub fn to_topic_map(&self) -> HashMap<(String, i32), Offset> {
353        self.elements()
354            .iter()
355            .map(|elem| ((elem.topic().to_owned(), elem.partition()), elem.offset()))
356            .collect()
357    }
358}
359
360impl PartialEq for TopicPartitionList {
361    fn eq(&self, other: &TopicPartitionList) -> bool {
362        if self.count() != other.count() {
363            return false;
364        }
365        self.elements().iter().all(|elem| {
366            if let Some(other_elem) = other.find_partition(elem.topic(), elem.partition()) {
367                elem == &other_elem
368            } else {
369                false
370            }
371        })
372    }
373}
374
375impl Default for TopicPartitionList {
376    fn default() -> Self {
377        Self::new()
378    }
379}
380
381impl fmt::Debug for TopicPartitionList {
382    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383        write!(f, "TPL {{")?;
384        for (i, elem) in self.elements().iter().enumerate() {
385            if i > 0 {
386                write!(f, "; ")?;
387            }
388            write!(
389                f,
390                "{}/{}: offset={:?} metadata={:?}",
391                elem.topic(),
392                elem.partition(),
393                elem.offset(),
394                elem.metadata(),
395            )?;
396        }
397        write!(f, "}}")
398    }
399}
400
401unsafe impl Send for TopicPartitionList {}
402unsafe impl Sync for TopicPartitionList {}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407
408    use std::collections::HashMap;
409
410    #[test]
411    fn offset_conversion() {
412        assert_eq!(Offset::Offset(123).to_raw(), Some(123));
413        assert_eq!(Offset::from_raw(123), Offset::Offset(123));
414
415        assert_eq!(Offset::OffsetTail(10).to_raw(), Some(-2010));
416        assert_eq!(Offset::from_raw(-2010), Offset::OffsetTail(10));
417    }
418
419    #[test]
420    fn add_partition_offset_find() {
421        let mut tpl = TopicPartitionList::new();
422
423        tpl.add_partition("topic1", 0);
424        tpl.add_partition("topic1", 1);
425        tpl.add_partition("topic2", 0);
426        tpl.add_partition("topic2", 1);
427
428        tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
429            .unwrap();
430        tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
431            .unwrap();
432        tpl.set_partition_offset("topic2", 0, Offset::Offset(2))
433            .unwrap();
434        tpl.set_partition_offset("topic2", 1, Offset::Offset(3))
435            .unwrap();
436
437        assert_eq!(tpl.count(), 4);
438        assert!(tpl
439            .set_partition_offset("topic0", 3, Offset::Offset(0))
440            .is_err());
441        assert!(tpl
442            .set_partition_offset("topic3", 0, Offset::Offset(0))
443            .is_err());
444
445        let tp0 = tpl.find_partition("topic1", 0).unwrap();
446        let tp1 = tpl.find_partition("topic1", 1).unwrap();
447        let tp2 = tpl.find_partition("topic2", 0).unwrap();
448        let mut tp3 = tpl.find_partition("topic2", 1).unwrap();
449
450        assert_eq!(tp0.topic(), "topic1");
451        assert_eq!(tp0.partition(), 0);
452        assert_eq!(tp0.offset(), Offset::Offset(0));
453        assert_eq!(tp1.topic(), "topic1");
454        assert_eq!(tp1.partition(), 1);
455        assert_eq!(tp1.offset(), Offset::Offset(1));
456        assert_eq!(tp2.topic(), "topic2");
457        assert_eq!(tp2.partition(), 0);
458        assert_eq!(tp2.offset(), Offset::Offset(2));
459        assert_eq!(tp3.topic(), "topic2");
460        assert_eq!(tp3.partition(), 1);
461        assert_eq!(tp3.offset(), Offset::Offset(3));
462
463        tp3.set_offset(Offset::Offset(1234)).unwrap();
464        assert_eq!(tp3.offset(), Offset::Offset(1234));
465    }
466
467    #[test]
468    fn add_partition_range() {
469        let mut tpl = TopicPartitionList::new();
470
471        tpl.add_partition_range("topic1", 0, 3);
472
473        tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
474            .unwrap();
475        tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
476            .unwrap();
477        tpl.set_partition_offset("topic1", 2, Offset::Offset(2))
478            .unwrap();
479        tpl.set_partition_offset("topic1", 3, Offset::Offset(3))
480            .unwrap();
481        assert!(tpl
482            .set_partition_offset("topic1", 4, Offset::Offset(2))
483            .is_err());
484    }
485
486    #[test]
487    fn check_defaults() {
488        let mut tpl = TopicPartitionList::new();
489
490        tpl.add_partition("topic1", 0);
491
492        let tp = tpl.find_partition("topic1", 0).unwrap();
493        assert_eq!(tp.offset(), Offset::Invalid);
494    }
495
496    #[test]
497    fn test_add_partition_offset_clone() {
498        let mut tpl = TopicPartitionList::new();
499        tpl.add_partition_offset("topic1", 0, Offset::Offset(0))
500            .unwrap();
501        tpl.add_partition_offset("topic1", 1, Offset::Offset(1))
502            .unwrap();
503
504        let tp0 = tpl.find_partition("topic1", 0).unwrap();
505        let tp1 = tpl.find_partition("topic1", 1).unwrap();
506        assert_eq!(tp0.topic(), "topic1");
507        assert_eq!(tp0.partition(), 0);
508        assert_eq!(tp0.offset(), Offset::Offset(0));
509        assert_eq!(tp1.topic(), "topic1");
510        assert_eq!(tp1.partition(), 1);
511        assert_eq!(tp1.offset(), Offset::Offset(1));
512
513        let tpl_cloned = tpl.clone();
514        let tp0 = tpl_cloned.find_partition("topic1", 0).unwrap();
515        let tp1 = tpl_cloned.find_partition("topic1", 1).unwrap();
516        assert_eq!(tp0.topic(), "topic1");
517        assert_eq!(tp0.partition(), 0);
518        assert_eq!(tp0.offset(), Offset::Offset(0));
519        assert_eq!(tp1.topic(), "topic1");
520        assert_eq!(tp1.partition(), 1);
521        assert_eq!(tp1.offset(), Offset::Offset(1));
522    }
523
524    #[test]
525    fn test_topic_map() {
526        let mut topic_map = HashMap::new();
527        topic_map.insert(("topic1".to_string(), 0), Offset::Invalid);
528        topic_map.insert(("topic1".to_string(), 1), Offset::Offset(123));
529        topic_map.insert(("topic2".to_string(), 0), Offset::Beginning);
530
531        let tpl = TopicPartitionList::from_topic_map(&topic_map).unwrap();
532        let topic_map2 = tpl.to_topic_map();
533        let tpl2 = TopicPartitionList::from_topic_map(&topic_map2).unwrap();
534
535        assert_eq!(topic_map, topic_map2);
536        assert_eq!(tpl, tpl2);
537    }
538}