1use std::collections::HashMap;
6use std::ffi::{CStr, CString};
7use std::fmt;
8use std::slice;
9use std::str;
10
11use libc::c_void;
12use rdkafka_sys as rdsys;
13use rdkafka_sys::types::*;
14
15use crate::error::{IsError, KafkaError, KafkaResult};
16use crate::util::{self, KafkaDrop, NativePtr};
17
18const PARTITION_UNASSIGNED: i32 = -1;
19
20const OFFSET_BEGINNING: i64 = rdsys::RD_KAFKA_OFFSET_BEGINNING as i64;
21const OFFSET_END: i64 = rdsys::RD_KAFKA_OFFSET_END as i64;
22const OFFSET_STORED: i64 = rdsys::RD_KAFKA_OFFSET_STORED as i64;
23const OFFSET_INVALID: i64 = rdsys::RD_KAFKA_OFFSET_INVALID as i64;
24const OFFSET_TAIL_BASE: i64 = rdsys::RD_KAFKA_OFFSET_TAIL_BASE as i64;
25
26#[derive(Copy, Clone, Debug, PartialEq, Eq)]
28pub enum Offset {
29 Beginning,
31 End,
33 Stored,
35 Invalid,
37 Offset(i64),
42 OffsetTail(i64),
47}
48
49impl Offset {
50 pub fn from_raw(raw_offset: i64) -> Offset {
53 match raw_offset {
54 OFFSET_BEGINNING => Offset::Beginning,
55 OFFSET_END => Offset::End,
56 OFFSET_STORED => Offset::Stored,
57 OFFSET_INVALID => Offset::Invalid,
58 n if n <= OFFSET_TAIL_BASE => Offset::OffsetTail(-(n - OFFSET_TAIL_BASE)),
59 n => Offset::Offset(n),
60 }
61 }
62
63 pub fn to_raw(self) -> Option<i64> {
69 match self {
70 Offset::Beginning => Some(OFFSET_BEGINNING),
71 Offset::End => Some(OFFSET_END),
72 Offset::Stored => Some(OFFSET_STORED),
73 Offset::Invalid => Some(OFFSET_INVALID),
74 Offset::Offset(n) if n >= 0 => Some(n),
75 Offset::OffsetTail(n) if n > 0 => Some(OFFSET_TAIL_BASE - n),
76 Offset::Offset(_) | Offset::OffsetTail(_) => None,
77 }
78 }
79}
80
81pub struct TopicPartitionListElem<'a> {
84 ptr: &'a mut RDKafkaTopicPartition,
85}
86
87impl<'a> TopicPartitionListElem<'a> {
88 fn from_ptr(
90 _owner_list: &'a TopicPartitionList,
91 ptr: &'a mut RDKafkaTopicPartition,
92 ) -> TopicPartitionListElem<'a> {
93 TopicPartitionListElem { ptr }
94 }
95
96 pub fn topic(&self) -> &str {
98 unsafe {
99 let c_str = self.ptr.topic;
100 CStr::from_ptr(c_str)
101 .to_str()
102 .expect("Topic name is not UTF-8")
103 }
104 }
105
106 pub fn error(&self) -> KafkaResult<()> {
108 let kafka_err = self.ptr.err;
109 if kafka_err.is_error() {
110 Err(KafkaError::OffsetFetch(kafka_err.into()))
111 } else {
112 Ok(())
113 }
114 }
115
116 pub fn partition(&self) -> i32 {
118 self.ptr.partition
119 }
120
121 pub fn offset(&self) -> Offset {
123 let raw_offset = self.ptr.offset;
124 Offset::from_raw(raw_offset)
125 }
126
127 pub fn set_offset(&mut self, offset: Offset) -> KafkaResult<()> {
129 match offset.to_raw() {
130 Some(offset) => {
131 self.ptr.offset = offset;
132 Ok(())
133 }
134 None => Err(KafkaError::SetPartitionOffset(
135 RDKafkaErrorCode::InvalidArgument,
136 )),
137 }
138 }
139
140 pub fn metadata(&self) -> &str {
142 let bytes = unsafe { util::ptr_to_slice(self.ptr.metadata, self.ptr.metadata_size) };
143 str::from_utf8(bytes).expect("Metadata is not UTF-8")
144 }
145
146 pub fn set_metadata<M>(&mut self, metadata: M)
148 where
149 M: AsRef<str>,
150 {
151 let metadata = metadata.as_ref();
152 let buf = unsafe { libc::malloc(metadata.len()) };
153 unsafe { libc::memcpy(buf, metadata.as_ptr() as *const c_void, metadata.len()) };
154 self.ptr.metadata = buf;
155 self.ptr.metadata_size = metadata.len();
156 }
157}
158
159impl<'a> PartialEq for TopicPartitionListElem<'a> {
160 fn eq(&self, other: &TopicPartitionListElem<'a>) -> bool {
161 self.topic() == other.topic()
162 && self.partition() == other.partition()
163 && self.offset() == other.offset()
164 && self.metadata() == other.metadata()
165 }
166}
167
168pub struct TopicPartitionList {
170 ptr: NativePtr<RDKafkaTopicPartitionList>,
171}
172
173unsafe impl KafkaDrop for RDKafkaTopicPartitionList {
174 const TYPE: &'static str = "topic partition list";
175 const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_topic_partition_list_destroy;
176}
177
178impl Clone for TopicPartitionList {
179 fn clone(&self) -> Self {
180 let new_tpl = unsafe { rdsys::rd_kafka_topic_partition_list_copy(self.ptr()) };
181 unsafe { TopicPartitionList::from_ptr(new_tpl) }
182 }
183}
184
185impl TopicPartitionList {
186 pub fn new() -> TopicPartitionList {
188 TopicPartitionList::with_capacity(5)
189 }
190
191 pub fn with_capacity(capacity: usize) -> TopicPartitionList {
193 let ptr = unsafe { rdsys::rd_kafka_topic_partition_list_new(capacity as i32) };
194 unsafe { TopicPartitionList::from_ptr(ptr) }
195 }
196
197 pub(crate) unsafe fn from_ptr(ptr: *mut RDKafkaTopicPartitionList) -> TopicPartitionList {
200 TopicPartitionList {
201 ptr: NativePtr::from_ptr(ptr).unwrap(),
202 }
203 }
204
205 pub fn from_topic_map(
207 topic_map: &HashMap<(String, i32), Offset>,
208 ) -> KafkaResult<TopicPartitionList> {
209 let mut tpl = TopicPartitionList::with_capacity(topic_map.len());
210 for ((topic_name, partition), offset) in topic_map {
211 tpl.add_partition_offset(topic_name, *partition, *offset)?;
212 }
213 Ok(tpl)
214 }
215
216 pub fn ptr(&self) -> *mut RDKafkaTopicPartitionList {
218 self.ptr.ptr()
219 }
220
221 pub fn count(&self) -> usize {
223 self.ptr.cnt as usize
224 }
225
226 pub fn capacity(&self) -> usize {
228 self.ptr.size as usize
229 }
230
231 pub fn add_topic_unassigned<'a>(&'a mut self, topic: &str) -> TopicPartitionListElem<'a> {
233 self.add_partition(topic, PARTITION_UNASSIGNED)
234 }
235
236 pub fn add_partition<'a>(
238 &'a mut self,
239 topic: &str,
240 partition: i32,
241 ) -> TopicPartitionListElem<'a> {
242 let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
243 let tp_ptr = unsafe {
244 rdsys::rd_kafka_topic_partition_list_add(self.ptr(), topic_c.as_ptr(), partition)
245 };
246 unsafe { TopicPartitionListElem::from_ptr(self, &mut *tp_ptr) }
247 }
248
249 pub fn add_partition_range(&mut self, topic: &str, start_partition: i32, stop_partition: i32) {
251 let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
252 unsafe {
253 rdsys::rd_kafka_topic_partition_list_add_range(
254 self.ptr(),
255 topic_c.as_ptr(),
256 start_partition,
257 stop_partition,
258 );
259 }
260 }
261
262 pub fn set_partition_offset(
265 &mut self,
266 topic: &str,
267 partition: i32,
268 offset: Offset,
269 ) -> KafkaResult<()> {
270 let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
271 let kafka_err = match offset.to_raw() {
272 Some(offset) => unsafe {
273 rdsys::rd_kafka_topic_partition_list_set_offset(
274 self.ptr(),
275 topic_c.as_ptr(),
276 partition,
277 offset,
278 )
279 },
280 None => RDKafkaRespErr::RD_KAFKA_RESP_ERR__INVALID_ARG,
281 };
282
283 if kafka_err.is_error() {
284 Err(KafkaError::SetPartitionOffset(kafka_err.into()))
285 } else {
286 Ok(())
287 }
288 }
289
290 pub fn add_partition_offset(
292 &mut self,
293 topic: &str,
294 partition: i32,
295 offset: Offset,
296 ) -> KafkaResult<()> {
297 self.add_partition(topic, partition);
298 self.set_partition_offset(topic, partition, offset)
299 }
300
301 pub fn find_partition(
303 &self,
304 topic: &str,
305 partition: i32,
306 ) -> Option<TopicPartitionListElem<'_>> {
307 let topic_c = CString::new(topic).expect("Topic name is not UTF-8");
308 let elem_ptr = unsafe {
309 rdsys::rd_kafka_topic_partition_list_find(self.ptr(), topic_c.as_ptr(), partition)
310 };
311 if elem_ptr.is_null() {
312 None
313 } else {
314 Some(unsafe { TopicPartitionListElem::from_ptr(self, &mut *elem_ptr) })
315 }
316 }
317
318 pub fn set_all_offsets(&mut self, offset: Offset) -> Result<(), KafkaError> {
320 let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
321 for elem_ptr in slice {
322 let mut elem = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
323 elem.set_offset(offset)?;
324 }
325 Ok(())
326 }
327
328 pub fn elements(&self) -> Vec<TopicPartitionListElem<'_>> {
330 let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
331 let mut vec = Vec::with_capacity(slice.len());
332 for elem_ptr in slice {
333 vec.push(TopicPartitionListElem::from_ptr(self, &mut *elem_ptr));
334 }
335 vec
336 }
337
338 pub fn elements_for_topic<'a>(&'a self, topic: &str) -> Vec<TopicPartitionListElem<'a>> {
340 let slice = unsafe { slice::from_raw_parts_mut((*self.ptr).elems, self.count()) };
341 let mut vec = Vec::with_capacity(slice.len());
342 for elem_ptr in slice {
343 let tp = TopicPartitionListElem::from_ptr(self, &mut *elem_ptr);
344 if tp.topic() == topic {
345 vec.push(tp);
346 }
347 }
348 vec
349 }
350
351 pub fn to_topic_map(&self) -> HashMap<(String, i32), Offset> {
353 self.elements()
354 .iter()
355 .map(|elem| ((elem.topic().to_owned(), elem.partition()), elem.offset()))
356 .collect()
357 }
358}
359
360impl PartialEq for TopicPartitionList {
361 fn eq(&self, other: &TopicPartitionList) -> bool {
362 if self.count() != other.count() {
363 return false;
364 }
365 self.elements().iter().all(|elem| {
366 if let Some(other_elem) = other.find_partition(elem.topic(), elem.partition()) {
367 elem == &other_elem
368 } else {
369 false
370 }
371 })
372 }
373}
374
375impl Default for TopicPartitionList {
376 fn default() -> Self {
377 Self::new()
378 }
379}
380
381impl fmt::Debug for TopicPartitionList {
382 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
383 write!(f, "TPL {{")?;
384 for (i, elem) in self.elements().iter().enumerate() {
385 if i > 0 {
386 write!(f, "; ")?;
387 }
388 write!(
389 f,
390 "{}/{}: offset={:?} metadata={:?}",
391 elem.topic(),
392 elem.partition(),
393 elem.offset(),
394 elem.metadata(),
395 )?;
396 }
397 write!(f, "}}")
398 }
399}
400
401unsafe impl Send for TopicPartitionList {}
402unsafe impl Sync for TopicPartitionList {}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407
408 use std::collections::HashMap;
409
410 #[test]
411 fn offset_conversion() {
412 assert_eq!(Offset::Offset(123).to_raw(), Some(123));
413 assert_eq!(Offset::from_raw(123), Offset::Offset(123));
414
415 assert_eq!(Offset::OffsetTail(10).to_raw(), Some(-2010));
416 assert_eq!(Offset::from_raw(-2010), Offset::OffsetTail(10));
417 }
418
419 #[test]
420 fn add_partition_offset_find() {
421 let mut tpl = TopicPartitionList::new();
422
423 tpl.add_partition("topic1", 0);
424 tpl.add_partition("topic1", 1);
425 tpl.add_partition("topic2", 0);
426 tpl.add_partition("topic2", 1);
427
428 tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
429 .unwrap();
430 tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
431 .unwrap();
432 tpl.set_partition_offset("topic2", 0, Offset::Offset(2))
433 .unwrap();
434 tpl.set_partition_offset("topic2", 1, Offset::Offset(3))
435 .unwrap();
436
437 assert_eq!(tpl.count(), 4);
438 assert!(tpl
439 .set_partition_offset("topic0", 3, Offset::Offset(0))
440 .is_err());
441 assert!(tpl
442 .set_partition_offset("topic3", 0, Offset::Offset(0))
443 .is_err());
444
445 let tp0 = tpl.find_partition("topic1", 0).unwrap();
446 let tp1 = tpl.find_partition("topic1", 1).unwrap();
447 let tp2 = tpl.find_partition("topic2", 0).unwrap();
448 let mut tp3 = tpl.find_partition("topic2", 1).unwrap();
449
450 assert_eq!(tp0.topic(), "topic1");
451 assert_eq!(tp0.partition(), 0);
452 assert_eq!(tp0.offset(), Offset::Offset(0));
453 assert_eq!(tp1.topic(), "topic1");
454 assert_eq!(tp1.partition(), 1);
455 assert_eq!(tp1.offset(), Offset::Offset(1));
456 assert_eq!(tp2.topic(), "topic2");
457 assert_eq!(tp2.partition(), 0);
458 assert_eq!(tp2.offset(), Offset::Offset(2));
459 assert_eq!(tp3.topic(), "topic2");
460 assert_eq!(tp3.partition(), 1);
461 assert_eq!(tp3.offset(), Offset::Offset(3));
462
463 tp3.set_offset(Offset::Offset(1234)).unwrap();
464 assert_eq!(tp3.offset(), Offset::Offset(1234));
465 }
466
467 #[test]
468 fn add_partition_range() {
469 let mut tpl = TopicPartitionList::new();
470
471 tpl.add_partition_range("topic1", 0, 3);
472
473 tpl.set_partition_offset("topic1", 0, Offset::Offset(0))
474 .unwrap();
475 tpl.set_partition_offset("topic1", 1, Offset::Offset(1))
476 .unwrap();
477 tpl.set_partition_offset("topic1", 2, Offset::Offset(2))
478 .unwrap();
479 tpl.set_partition_offset("topic1", 3, Offset::Offset(3))
480 .unwrap();
481 assert!(tpl
482 .set_partition_offset("topic1", 4, Offset::Offset(2))
483 .is_err());
484 }
485
486 #[test]
487 fn check_defaults() {
488 let mut tpl = TopicPartitionList::new();
489
490 tpl.add_partition("topic1", 0);
491
492 let tp = tpl.find_partition("topic1", 0).unwrap();
493 assert_eq!(tp.offset(), Offset::Invalid);
494 }
495
496 #[test]
497 fn test_add_partition_offset_clone() {
498 let mut tpl = TopicPartitionList::new();
499 tpl.add_partition_offset("topic1", 0, Offset::Offset(0))
500 .unwrap();
501 tpl.add_partition_offset("topic1", 1, Offset::Offset(1))
502 .unwrap();
503
504 let tp0 = tpl.find_partition("topic1", 0).unwrap();
505 let tp1 = tpl.find_partition("topic1", 1).unwrap();
506 assert_eq!(tp0.topic(), "topic1");
507 assert_eq!(tp0.partition(), 0);
508 assert_eq!(tp0.offset(), Offset::Offset(0));
509 assert_eq!(tp1.topic(), "topic1");
510 assert_eq!(tp1.partition(), 1);
511 assert_eq!(tp1.offset(), Offset::Offset(1));
512
513 let tpl_cloned = tpl.clone();
514 let tp0 = tpl_cloned.find_partition("topic1", 0).unwrap();
515 let tp1 = tpl_cloned.find_partition("topic1", 1).unwrap();
516 assert_eq!(tp0.topic(), "topic1");
517 assert_eq!(tp0.partition(), 0);
518 assert_eq!(tp0.offset(), Offset::Offset(0));
519 assert_eq!(tp1.topic(), "topic1");
520 assert_eq!(tp1.partition(), 1);
521 assert_eq!(tp1.offset(), Offset::Offset(1));
522 }
523
524 #[test]
525 fn test_topic_map() {
526 let mut topic_map = HashMap::new();
527 topic_map.insert(("topic1".to_string(), 0), Offset::Invalid);
528 topic_map.insert(("topic1".to_string(), 1), Offset::Offset(123));
529 topic_map.insert(("topic2".to_string(), 0), Offset::Beginning);
530
531 let tpl = TopicPartitionList::from_topic_map(&topic_map).unwrap();
532 let topic_map2 = tpl.to_topic_map();
533 let tpl2 = TopicPartitionList::from_topic_map(&topic_map2).unwrap();
534
535 assert_eq!(topic_map, topic_map2);
536 assert_eq!(tpl, tpl2);
537 }
538}