1use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::str::{self};
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use chrono::{DateTime, NaiveDateTime};
19use differential_dataflow::{AsCollection, Hashable};
20use futures::StreamExt;
21use itertools::Itertools;
22use maplit::btreemap;
23use mz_kafka_util::client::{
24 GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
25};
26use mz_ore::assert_none;
27use mz_ore::cast::CastFrom;
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::iter::IteratorExt;
31use mz_repr::adt::timestamp::CheckedTimestamp;
32use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
33use mz_ssh_util::tunnel::SshTunnelStatus;
34use mz_storage_types::errors::{
35 ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
36};
37use mz_storage_types::sources::kafka::{
38 KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
39};
40use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
41use mz_timely_util::antichain::AntichainExt;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::containers::stack::AccountedStackBuilder;
46use mz_timely_util::order::Partitioned;
47use rdkafka::consumer::base_consumer::PartitionQueue;
48use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
49use rdkafka::error::KafkaError;
50use rdkafka::message::{BorrowedMessage, Headers};
51use rdkafka::statistics::Statistics;
52use rdkafka::topic_partition_list::Offset;
53use rdkafka::{ClientContext, Message, TopicPartitionList};
54use serde::{Deserialize, Serialize};
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Pipeline;
58use timely::dataflow::operators::Capability;
59use timely::dataflow::operators::core::Partition;
60use timely::dataflow::operators::vec::Broadcast;
61use timely::dataflow::{Scope, StreamVec};
62use timely::progress::Antichain;
63use timely::progress::Timestamp;
64use tokio::sync::{Notify, mpsc};
65use tracing::{error, info, trace};
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
68use crate::metrics::source::kafka::KafkaSourceMetrics;
69use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
70use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
71use crate::statistics::SourceStatistics;
72
73#[derive(
74 Clone,
75 Debug,
76 Default,
77 PartialEq,
78 Eq,
79 PartialOrd,
80 Ord,
81 Serialize,
82 Deserialize
83)]
84struct HealthStatus {
85 kafka: Option<HealthStatusUpdate>,
86 ssh: Option<HealthStatusUpdate>,
87}
88
89impl HealthStatus {
90 fn kafka(update: HealthStatusUpdate) -> Self {
91 Self {
92 kafka: Some(update),
93 ssh: None,
94 }
95 }
96
97 fn ssh(update: HealthStatusUpdate) -> Self {
98 Self {
99 kafka: None,
100 ssh: Some(update),
101 }
102 }
103}
104
105pub struct KafkaSourceReader {
107 topic_name: String,
109 source_name: String,
111 id: GlobalId,
113 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
115 partition_consumers: Vec<PartitionConsumer>,
117 worker_id: usize,
119 worker_count: usize,
121 last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
125 start_offsets: BTreeMap<PartitionId, i64>,
127 stats_rx: crossbeam_channel::Receiver<Jsonb>,
129 partition_metrics: KafkaSourceMetrics,
131 partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
133}
134
135struct PartitionCapability {
136 data: Capability<KafkaTimestamp>,
138}
139
140type HighWatermark = u64;
144
145pub struct KafkaResumeUpperProcessor {
148 config: RawSourceCreationConfig,
149 topic_name: String,
150 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
151 statistics: Vec<SourceStatistics>,
152}
153
154fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
158 let pid = usize::try_from(pid).expect("positive pid");
159 ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
160}
161
162struct SourceOutputInfo {
163 id: GlobalId,
164 output_index: usize,
165 resume_upper: Antichain<KafkaTimestamp>,
166 metadata_columns: Vec<KafkaMetadataKind>,
167}
168
169impl SourceRender for KafkaSourceConnection {
170 type Time = KafkaTimestamp;
175
176 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
177
178 fn render<G: Scope<Timestamp = KafkaTimestamp>>(
179 self,
180 scope: &mut G,
181 config: &RawSourceCreationConfig,
182 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
183 start_signal: impl std::future::Future<Output = ()> + 'static,
184 ) -> (
185 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
186 StreamVec<G, HealthStatusMessage>,
187 StreamVec<G, Probe<KafkaTimestamp>>,
188 Vec<PressOnDropButton>,
189 ) {
190 let (metadata, probes, metadata_token) =
191 render_metadata_fetcher(scope, self.clone(), config.clone());
192 let (data, health, reader_token) = render_reader(
193 scope,
194 self,
195 config.clone(),
196 resume_uppers,
197 metadata,
198 start_signal,
199 );
200
201 let partition_count = u64::cast_from(config.source_exports.len());
202 let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
203 partition_count,
204 |((output, data), time, diff)| {
205 let output = u64::cast_from(*output);
206 (output, (data.clone(), time.clone(), diff.clone()))
207 },
208 );
209 let mut data_collections = BTreeMap::new();
210 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
211 data_collections.insert(*id, data_stream.as_collection());
212 }
213
214 (
215 data_collections,
216 health,
217 probes,
218 vec![metadata_token, reader_token],
219 )
220 }
221}
222
223fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
228 scope: &G,
229 connection: KafkaSourceConnection,
230 config: RawSourceCreationConfig,
231 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
232 metadata_stream: StreamVec<G, (mz_repr::Timestamp, MetadataUpdate)>,
233 start_signal: impl std::future::Future<Output = ()> + 'static,
234) -> (
235 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
236 StreamVec<G, HealthStatusMessage>,
237 PressOnDropButton,
238) {
239 let name = format!("KafkaReader({})", config.id);
240 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
241
242 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
243 let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
244
245 let mut metadata_input = builder.new_disconnected_input(metadata_stream.broadcast(), Pipeline);
246
247 let mut outputs = vec![];
248
249 let mut all_export_stats = vec![];
251 let mut snapshot_export_stats = vec![];
252 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
253 let SourceExport {
254 details,
255 storage_metadata: _,
256 data_config: _,
257 } = export;
258 let resume_upper = Antichain::from_iter(
259 config
260 .source_resume_uppers
261 .get(id)
262 .expect("all source exports must be present in source resume uppers")
263 .iter()
264 .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
265 );
266
267 let metadata_columns = match details {
268 SourceExportDetails::Kafka(details) => details
269 .metadata_columns
270 .iter()
271 .map(|(_name, kind)| kind.clone())
272 .collect::<Vec<_>>(),
273 _ => panic!("unexpected source export details: {:?}", details),
274 };
275
276 let statistics = config
277 .statistics
278 .get(id)
279 .expect("statistics have been initialized")
280 .clone();
281 if resume_upper.as_ref() == &[Partitioned::minimum()] {
283 snapshot_export_stats.push(statistics.clone());
284 }
285 all_export_stats.push(statistics);
286
287 let output = SourceOutputInfo {
288 id: *id,
289 resume_upper,
290 output_index: idx,
291 metadata_columns,
292 };
293 outputs.push(output);
294 }
295
296 let busy_signal = Arc::clone(&config.busy_signal);
297 let button = builder.build(move |caps| {
298 SignaledFuture::new(busy_signal, async move {
299 let [mut data_cap, health_cap] = caps.try_into().unwrap();
300
301 let client_id = connection.client_id(
302 config.config.config_set(),
303 &config.config.connection_context,
304 config.id,
305 );
306 let group_id = connection.group_id(&config.config.connection_context, config.id);
307 let KafkaSourceConnection {
308 connection,
309 topic,
310 topic_metadata_refresh_interval,
311 start_offsets,
312 metadata_columns: _,
313 connection_id: _, group_id_prefix: _, } = connection;
318
319 let mut start_offsets: BTreeMap<_, i64> = start_offsets
321 .clone()
322 .into_iter()
323 .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
324 .map(|(k, v)| (k, v))
325 .collect();
326
327 let mut partition_capabilities = BTreeMap::new();
328 let mut max_pid = None;
329 let resume_upper = Antichain::from_iter(
330 outputs
331 .iter()
332 .map(|output| output.resume_upper.clone())
333 .flatten(),
334 );
335
336 for ts in resume_upper.elements() {
337 if let Some(pid) = ts.interval().singleton() {
338 let pid = pid.unwrap_exact();
339 max_pid = std::cmp::max(max_pid, Some(*pid));
340 if responsible_for_pid(&config, *pid) {
341 let restored_offset = i64::try_from(ts.timestamp().offset)
342 .expect("restored kafka offsets must fit into i64");
343 if let Some(start_offset) = start_offsets.get_mut(pid) {
344 *start_offset = std::cmp::max(restored_offset, *start_offset);
345 } else {
346 start_offsets.insert(*pid, restored_offset);
347 }
348
349 let part_ts = Partitioned::new_singleton(
350 RangeBound::exact(*pid),
351 ts.timestamp().clone(),
352 );
353 let part_cap = PartitionCapability {
354 data: data_cap.delayed(&part_ts),
355 };
356 partition_capabilities.insert(*pid, part_cap);
357 }
358 }
359 }
360 let lower = max_pid
361 .map(RangeBound::after)
362 .unwrap_or(RangeBound::NegInfinity);
363 let future_ts =
364 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
365 data_cap.downgrade(&future_ts);
366
367 info!(
368 source_id = config.id.to_string(),
369 worker_id = config.worker_id,
370 num_workers = config.worker_count,
371 "instantiating Kafka source reader at offsets {start_offsets:?}"
372 );
373
374 let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
375 let notificator = Arc::new(Notify::new());
376
377 let consumer: Result<BaseConsumer<_>, _> = connection
378 .create_with_context(
379 &config.config,
380 GlueConsumerContext {
381 notificator: Arc::clone(¬ificator),
382 stats_tx,
383 inner: MzClientContext::default(),
384 },
385 &btreemap! {
386 "enable.auto.commit" => "false".into(),
391 "auto.offset.reset" => "earliest".into(),
394 "topic.metadata.refresh.interval.ms" =>
397 topic_metadata_refresh_interval
398 .as_millis()
399 .to_string(),
400 "fetch.message.max.bytes" => "134217728".into(),
402 "group.id" => group_id.clone(),
408 "client.id" => client_id.clone(),
411 },
412 InTask::Yes,
413 )
414 .await;
415
416 let consumer = match consumer {
417 Ok(consumer) => Arc::new(consumer),
418 Err(e) => {
419 let update = HealthStatusUpdate::halting(
420 format!(
421 "failed creating kafka reader consumer: {}",
422 e.display_with_causes()
423 ),
424 None,
425 );
426 health_output.give(
427 &health_cap,
428 HealthStatusMessage {
429 id: None,
430 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
431 StatusNamespace::Ssh
432 } else {
433 StatusNamespace::Kafka
434 },
435 update: update.clone(),
436 },
437 );
438 for (output, update) in outputs.iter().repeat_clone(update) {
439 health_output.give(
440 &health_cap,
441 HealthStatusMessage {
442 id: Some(output.id),
443 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
444 StatusNamespace::Ssh
445 } else {
446 StatusNamespace::Kafka
447 },
448 update,
449 },
450 );
451 }
452 std::future::pending::<()>().await;
456 unreachable!("pending future never returns");
457 }
458 };
459
460 start_signal.await;
464 info!(
465 source_id = config.id.to_string(),
466 worker_id = config.worker_id,
467 num_workers = config.worker_count,
468 "kafka worker noticed rehydration is finished, starting partition queues..."
469 );
470
471 let partition_ids = start_offsets.keys().copied().collect();
472 let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
473
474 let mut reader = KafkaSourceReader {
475 topic_name: topic.clone(),
476 source_name: config.name.clone(),
477 id: config.id,
478 partition_consumers: Vec::new(),
479 consumer: Arc::clone(&consumer),
480 worker_id: config.worker_id,
481 worker_count: config.worker_count,
482 last_offsets: outputs
483 .iter()
484 .map(|output| (output.output_index, BTreeMap::new()))
485 .collect(),
486 start_offsets,
487 stats_rx,
488 partition_metrics: config.metrics.get_kafka_source_metrics(
489 partition_ids,
490 topic.clone(),
491 config.id,
492 ),
493 partition_capabilities,
494 };
495
496 let offset_committer = KafkaResumeUpperProcessor {
497 config: config.clone(),
498 topic_name: topic.clone(),
499 consumer,
500 statistics: all_export_stats.clone(),
501 };
502
503 if !snapshot_export_stats.is_empty() {
505 if let Err(e) = offset_committer
506 .process_frontier(resume_upper.clone())
507 .await
508 {
509 offset_commit_metrics.offset_commit_failures.inc();
510 tracing::warn!(
511 %e,
512 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
513 worker_id = config.worker_id,
514 source_id = config.id,
515 upper = resume_upper.pretty()
516 );
517 }
518 for statistics in config.statistics.values() {
522 statistics.set_snapshot_records_known(0);
523 statistics.set_snapshot_records_staged(0);
524 }
525 }
526
527 let resume_uppers_process_loop = async move {
528 tokio::pin!(resume_uppers);
529 while let Some(frontier) = resume_uppers.next().await {
530 if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
531 offset_commit_metrics.offset_commit_failures.inc();
532 tracing::warn!(
533 %e,
534 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
535 worker_id = config.worker_id,
536 source_id = config.id,
537 upper = frontier.pretty()
538 );
539 }
540 }
541 std::future::pending::<()>().await;
546 };
547 tokio::pin!(resume_uppers_process_loop);
548
549 let mut metadata_update: Option<MetadataUpdate> = None;
550 let mut snapshot_total = None;
551
552 let max_wait_time =
553 mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
554 loop {
555 tokio::select! {
558 _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
561
562 _ = metadata_input.ready() => {
563 let mut updates = Vec::new();
565 while let Some(event) = metadata_input.next_sync() {
566 if let Event::Data(_, mut data) = event {
567 updates.append(&mut data);
568 }
569 }
570 metadata_update = updates
571 .into_iter()
572 .max_by_key(|(ts, _)| *ts)
573 .map(|(_, update)| update);
574 }
575
576 _ = resume_uppers_process_loop.as_mut() => {},
580 }
581
582 match metadata_update.take() {
583 Some(MetadataUpdate::Partitions(partitions)) => {
584 let max_pid = partitions.keys().last().cloned();
585 let lower = max_pid
586 .map(RangeBound::after)
587 .unwrap_or(RangeBound::NegInfinity);
588 let future_ts = Partitioned::new_range(
589 lower,
590 RangeBound::PosInfinity,
591 MzOffset::from(0),
592 );
593
594 let mut offset_known = 0;
595 for (&pid, &high_watermark) in &partitions {
596 if responsible_for_pid(&config, pid) {
597 offset_known += high_watermark;
598 reader.ensure_partition(pid);
599 if let Entry::Vacant(entry) =
600 reader.partition_capabilities.entry(pid)
601 {
602 let start_offset = match reader.start_offsets.get(&pid) {
603 Some(&offset) => offset.try_into().unwrap(),
604 None => 0u64,
605 };
606 let part_since_ts = Partitioned::new_singleton(
607 RangeBound::exact(pid),
608 MzOffset::from(start_offset),
609 );
610
611 entry.insert(PartitionCapability {
612 data: data_cap.delayed(&part_since_ts),
613 });
614 }
615 }
616 }
617
618 if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
621 snapshot_total = Some(offset_known);
625 }
626
627 for output in &outputs {
631 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
632 health_output.give(
633 &health_cap,
634 HealthStatusMessage {
635 id: Some(output.id),
636 namespace,
637 update: HealthStatusUpdate::running(),
638 },
639 );
640 }
641 }
642 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
643 health_output.give(
644 &health_cap,
645 HealthStatusMessage {
646 id: None,
647 namespace,
648 update: HealthStatusUpdate::running(),
649 },
650 );
651 }
652
653 for export_stat in all_export_stats.iter() {
654 export_stat.set_offset_known(offset_known);
655 }
656
657 data_cap.downgrade(&future_ts);
658 }
659 Some(MetadataUpdate::TransientError(status)) => {
660 if let Some(update) = status.kafka {
661 health_output.give(
662 &health_cap,
663 HealthStatusMessage {
664 id: None,
665 namespace: StatusNamespace::Kafka,
666 update: update.clone(),
667 },
668 );
669 for (output, update) in outputs.iter().repeat_clone(update) {
670 health_output.give(
671 &health_cap,
672 HealthStatusMessage {
673 id: Some(output.id),
674 namespace: StatusNamespace::Kafka,
675 update,
676 },
677 );
678 }
679 }
680 if let Some(update) = status.ssh {
681 health_output.give(
682 &health_cap,
683 HealthStatusMessage {
684 id: None,
685 namespace: StatusNamespace::Ssh,
686 update: update.clone(),
687 },
688 );
689 for (output, update) in outputs.iter().repeat_clone(update) {
690 health_output.give(
691 &health_cap,
692 HealthStatusMessage {
693 id: Some(output.id),
694 namespace: StatusNamespace::Ssh,
695 update,
696 },
697 );
698 }
699 }
700 }
701 Some(MetadataUpdate::DefiniteError(error)) => {
702 health_output.give(
703 &health_cap,
704 HealthStatusMessage {
705 id: None,
706 namespace: StatusNamespace::Kafka,
707 update: HealthStatusUpdate::stalled(
708 error.to_string(),
709 None,
710 ),
711 },
712 );
713 let error = Err(error.into());
714 let time = data_cap.time().clone();
715 for (output, error) in
716 outputs.iter().map(|o| o.output_index).repeat_clone(error)
717 {
718 data_output
719 .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
720 .await;
721 }
722
723 return;
724 }
725 None => {}
726 }
727
728 while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
736 match result {
737 Err(e) => {
738 let error = format!(
739 "kafka error when polling consumer for source: {} topic: {} : {}",
740 reader.source_name, reader.topic_name, e
741 );
742 let status = HealthStatusUpdate::stalled(error, None);
743 health_output.give(
744 &health_cap,
745 HealthStatusMessage {
746 id: None,
747 namespace: StatusNamespace::Kafka,
748 update: status.clone(),
749 },
750 );
751 for (output, status) in outputs.iter().repeat_clone(status) {
752 health_output.give(
753 &health_cap,
754 HealthStatusMessage {
755 id: Some(output.id),
756 namespace: StatusNamespace::Kafka,
757 update: status,
758 },
759 );
760 }
761 }
762 Ok(message) => {
763 let output_messages = outputs
764 .iter()
765 .map(|output| {
766 let (message, ts) = construct_source_message(
767 &message,
768 &output.metadata_columns,
769 );
770 (output.output_index, message, ts)
771 })
772 .collect::<Vec<_>>();
776 for (output_index, message, ts) in output_messages {
777 if let Some((msg, time, diff)) =
778 reader.handle_message(message, ts, &output_index)
779 {
780 let pid = time.interval().singleton().unwrap().unwrap_exact();
781 let part_cap = &reader.partition_capabilities[pid].data;
782 let msg = msg.map_err(|e| {
783 DataflowError::SourceError(Box::new(SourceError {
784 error: SourceErrorDetails::Other(e.to_string().into()),
785 }))
786 });
787 data_output
788 .give_fueled(part_cap, ((output_index, msg), time, diff))
789 .await;
790 }
791 }
792 }
793 }
794 }
795
796 reader.update_stats();
797
798 let mut consumers = std::mem::take(&mut reader.partition_consumers);
800 for consumer in consumers.iter_mut() {
801 let pid = consumer.pid();
802 let mut partition_exhausted = false;
808 for _ in 0..10_000 {
809 let Some(message) = consumer.get_next_message().transpose() else {
810 partition_exhausted = true;
811 break;
812 };
813
814 for output in outputs.iter() {
815 let message = match &message {
816 Ok((msg, pid)) => {
817 let (msg, ts) =
818 construct_source_message(msg, &output.metadata_columns);
819 assert_eq!(*pid, ts.0);
820 Ok(reader.handle_message(msg, ts, &output.output_index))
821 }
822 Err(err) => Err(err),
823 };
824 match message {
825 Ok(Some((msg, time, diff))) => {
826 let pid = time.interval().singleton().unwrap().unwrap_exact();
827 let part_cap = &reader.partition_capabilities[pid].data;
828 let msg = msg.map_err(|e| {
829 DataflowError::SourceError(Box::new(SourceError {
830 error: SourceErrorDetails::Other(e.to_string().into()),
831 }))
832 });
833 data_output
834 .give_fueled(
835 part_cap,
836 ((output.output_index, msg), time, diff),
837 )
838 .await;
839 }
840 Ok(None) => continue,
842 Err(err) => {
843 let last_offset = reader
844 .last_offsets
845 .get(&output.output_index)
846 .expect("output known to be installed")
847 .get(&pid)
848 .expect("partition known to be installed");
849
850 let status = HealthStatusUpdate::stalled(
851 format!(
852 "error consuming from source: {} topic: {topic}:\
853 partition: {pid} last processed offset:\
854 {last_offset} : {err}",
855 config.name
856 ),
857 None,
858 );
859 health_output.give(
860 &health_cap,
861 HealthStatusMessage {
862 id: None,
863 namespace: StatusNamespace::Kafka,
864 update: status.clone(),
865 },
866 );
867 health_output.give(
868 &health_cap,
869 HealthStatusMessage {
870 id: Some(output.id),
871 namespace: StatusNamespace::Kafka,
872 update: status,
873 },
874 );
875 }
876 }
877 }
878 }
879 if !partition_exhausted {
880 notificator.notify_one();
881 }
882 }
883 assert!(reader.partition_consumers.is_empty());
885 reader.partition_consumers = consumers;
886
887 let positions = reader.consumer.position().unwrap();
888 let topic_positions = positions.elements_for_topic(&reader.topic_name);
889 let mut snapshot_staged = 0;
890
891 for position in topic_positions {
892 if let Offset::Offset(offset) = position.offset() {
895 let pid = position.partition();
896 let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
897 let upper =
898 Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
899
900 let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
901 match part_cap.data.try_downgrade(&upper) {
902 Ok(()) => {
903 if !snapshot_export_stats.is_empty() {
904 snapshot_staged += offset.try_into().unwrap_or(0u64);
907 if let Some(snapshot_total) = snapshot_total {
909 snapshot_staged =
912 std::cmp::min(snapshot_staged, snapshot_total);
913 }
914 }
915 }
916 Err(_) => {
917 info!(
920 source_id = config.id.to_string(),
921 worker_id = config.worker_id,
922 num_workers = config.worker_count,
923 "kafka source frontier downgrade skipped due to already \
924 seen offset: {:?}",
925 upper
926 );
927 }
928 };
929
930 }
931 }
932
933 if let (Some(snapshot_total), true) =
934 (snapshot_total, !snapshot_export_stats.is_empty())
935 {
936 for export_stat in snapshot_export_stats.iter() {
937 export_stat.set_snapshot_records_known(snapshot_total);
938 export_stat.set_snapshot_records_staged(snapshot_staged);
939 }
940 if snapshot_total == snapshot_staged {
941 snapshot_export_stats.clear();
942 }
943 }
944 }
945 })
946 });
947
948 (
949 stream.as_collection(),
950 health_stream,
951 button.press_on_drop(),
952 )
953}
954
955impl KafkaResumeUpperProcessor {
956 async fn process_frontier(
957 &self,
958 frontier: Antichain<KafkaTimestamp>,
959 ) -> Result<(), anyhow::Error> {
960 use rdkafka::consumer::CommitMode;
961
962 let mut offsets = vec![];
964 let mut offset_committed = 0;
965 for ts in frontier.iter() {
966 if let Some(pid) = ts.interval().singleton() {
967 let pid = pid.unwrap_exact();
968 if responsible_for_pid(&self.config, *pid) {
969 offsets.push((pid.clone(), *ts.timestamp()));
970
971 offset_committed += ts.timestamp().offset;
976 }
977 }
978 }
979
980 for export_stat in self.statistics.iter() {
981 export_stat.set_offset_committed(offset_committed);
982 }
983
984 if !offsets.is_empty() {
985 let mut tpl = TopicPartitionList::new();
986 for (pid, offset) in offsets {
987 let offset_to_commit =
988 Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
989 tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
990 .expect("offset known to be valid");
991 }
992 let consumer = Arc::clone(&self.consumer);
993 mz_ore::task::spawn_blocking(
994 || format!("source({}) kafka offset commit", self.config.id),
995 move || consumer.commit(&tpl, CommitMode::Sync),
996 )
997 .await?;
998 }
999 Ok(())
1000 }
1001}
1002
1003impl KafkaSourceReader {
1004 fn ensure_partition(&mut self, pid: PartitionId) {
1006 if self.last_offsets.is_empty() {
1007 tracing::info!(
1008 source_id = %self.id,
1009 worker_id = %self.worker_id,
1010 "kafka source does not have any outputs, not creating partition queue");
1011
1012 return;
1013 }
1014 for last_offsets in self.last_offsets.values() {
1015 if last_offsets.contains_key(&pid) {
1017 return;
1018 }
1019 }
1020
1021 let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1022 self.create_partition_queue(pid, Offset::Offset(start_offset));
1023
1024 for last_offsets in self.last_offsets.values_mut() {
1025 let prev = last_offsets.insert(pid, start_offset - 1);
1026 assert_none!(prev);
1027 }
1028 }
1029
1030 fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1032 info!(
1033 source_id = self.id.to_string(),
1034 worker_id = self.worker_id,
1035 num_workers = self.worker_count,
1036 "activating Kafka queue for topic {}, partition {}",
1037 self.topic_name,
1038 partition_id,
1039 );
1040
1041 let tpl = self.consumer.assignment().unwrap();
1043 let mut partition_list = TopicPartitionList::new();
1045 for partition in tpl.elements_for_topic(&self.topic_name) {
1046 partition_list
1047 .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1048 .expect("offset known to be valid");
1049 }
1050 partition_list
1052 .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1053 .expect("offset known to be valid");
1054 self.consumer
1055 .assign(&partition_list)
1056 .expect("assignment known to be valid");
1057
1058 let context = Arc::clone(self.consumer.context());
1061 for pc in &mut self.partition_consumers {
1062 pc.partition_queue = self
1063 .consumer
1064 .split_partition_queue(&self.topic_name, pc.pid)
1065 .expect("partition known to be valid");
1066 pc.partition_queue.set_nonempty_callback({
1067 let context = Arc::clone(&context);
1068 move || context.inner().activate()
1069 });
1070 }
1071
1072 let mut partition_queue = self
1073 .consumer
1074 .split_partition_queue(&self.topic_name, partition_id)
1075 .expect("partition known to be valid");
1076 partition_queue.set_nonempty_callback(move || context.inner().activate());
1077 self.partition_consumers
1078 .push(PartitionConsumer::new(partition_id, partition_queue));
1079 assert_eq!(
1080 self.consumer
1081 .assignment()
1082 .unwrap()
1083 .elements_for_topic(&self.topic_name)
1084 .len(),
1085 self.partition_consumers.len()
1086 );
1087 }
1088
1089 fn update_stats(&mut self) {
1091 while let Ok(stats) = self.stats_rx.try_recv() {
1092 match serde_json::from_str::<Statistics>(&stats.to_string()) {
1093 Ok(statistics) => {
1094 let topic = statistics.topics.get(&self.topic_name);
1095 match topic {
1096 Some(topic) => {
1097 for (id, partition) in &topic.partitions {
1098 self.partition_metrics
1099 .set_offset_max(*id, partition.hi_offset);
1100 }
1101 }
1102 None => error!("No stats found for topic: {}", &self.topic_name),
1103 }
1104 }
1105 Err(e) => {
1106 error!("failed decoding librdkafka statistics JSON: {}", e);
1107 }
1108 }
1109 }
1110 }
1111
1112 fn handle_message(
1115 &mut self,
1116 message: Result<SourceMessage, KafkaHeaderParseError>,
1117 (partition, offset): (PartitionId, MzOffset),
1118 output_index: &usize,
1119 ) -> Option<(
1120 Result<SourceMessage, KafkaHeaderParseError>,
1121 KafkaTimestamp,
1122 Diff,
1123 )> {
1124 assert!(
1136 self.last_offsets
1137 .get(output_index)
1138 .unwrap()
1139 .contains_key(&partition)
1140 );
1141
1142 let last_offset_ref = self
1143 .last_offsets
1144 .get_mut(output_index)
1145 .expect("output known to be installed")
1146 .get_mut(&partition)
1147 .expect("partition known to be installed");
1148
1149 let last_offset = *last_offset_ref;
1150 let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1151 if offset_as_i64 <= last_offset {
1152 info!(
1153 source_id = self.id.to_string(),
1154 worker_id = self.worker_id,
1155 num_workers = self.worker_count,
1156 "kafka message before expected offset: \
1157 source {} (reading topic {}, partition {}, output {}) \
1158 received offset {} expected offset {:?}",
1159 self.source_name,
1160 self.topic_name,
1161 partition,
1162 output_index,
1163 offset.offset,
1164 last_offset + 1,
1165 );
1166 None
1168 } else {
1169 *last_offset_ref = offset_as_i64;
1170
1171 let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1172 Some((message, ts, Diff::ONE))
1173 }
1174 }
1175}
1176
1177fn construct_source_message(
1178 msg: &BorrowedMessage<'_>,
1179 metadata_columns: &[KafkaMetadataKind],
1180) -> (
1181 Result<SourceMessage, KafkaHeaderParseError>,
1182 (PartitionId, MzOffset),
1183) {
1184 let pid = msg.partition();
1185 let Ok(offset) = u64::try_from(msg.offset()) else {
1186 panic!(
1187 "got negative offset ({}) from otherwise non-error'd kafka message",
1188 msg.offset()
1189 );
1190 };
1191
1192 let mut metadata = Row::default();
1193 let mut packer = metadata.packer();
1194 for kind in metadata_columns {
1195 match kind {
1196 KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1197 KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1198 KafkaMetadataKind::Timestamp => {
1199 let ts = msg
1200 .timestamp()
1201 .to_millis()
1202 .expect("kafka sources always have upstream_time");
1203
1204 let d: Datum = DateTime::from_timestamp_millis(ts)
1205 .and_then(|dt| {
1206 let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1207 dt.naive_utc().try_into().ok();
1208 ct
1209 })
1210 .into();
1211 packer.push(d)
1212 }
1213 KafkaMetadataKind::Header { key, use_bytes } => {
1214 match msg.headers() {
1215 Some(headers) => {
1216 let d = headers
1217 .iter()
1218 .filter(|header| header.key == key)
1219 .last()
1220 .map(|header| match header.value {
1221 Some(v) => {
1222 if *use_bytes {
1223 Ok(Datum::Bytes(v))
1224 } else {
1225 match str::from_utf8(v) {
1226 Ok(str) => Ok(Datum::String(str)),
1227 Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1228 key: key.clone(),
1229 raw: v.to_vec(),
1230 }),
1231 }
1232 }
1233 }
1234 None => Ok(Datum::Null),
1235 })
1236 .unwrap_or_else(|| {
1237 Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1238 });
1239 match d {
1240 Ok(d) => packer.push(d),
1241 Err(err) => return (Err(err), (pid, offset.into())),
1243 }
1244 }
1245 None => packer.push(Datum::Null),
1246 }
1247 }
1248 KafkaMetadataKind::Headers => {
1249 packer.push_list_with(|r| {
1250 if let Some(headers) = msg.headers() {
1251 for header in headers.iter() {
1252 match header.value {
1253 Some(v) => r.push_list_with(|record_row| {
1254 record_row.push(Datum::String(header.key));
1255 record_row.push(Datum::Bytes(v));
1256 }),
1257 None => r.push_list_with(|record_row| {
1258 record_row.push(Datum::String(header.key));
1259 record_row.push(Datum::Null);
1260 }),
1261 }
1262 }
1263 }
1264 });
1265 }
1266 }
1267 }
1268
1269 let key = match msg.key() {
1270 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1271 None => Row::pack([Datum::Null]),
1272 };
1273 let value = match msg.payload() {
1274 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1275 None => Row::pack([Datum::Null]),
1276 };
1277 (
1278 Ok(SourceMessage {
1279 key,
1280 value,
1281 metadata,
1282 }),
1283 (pid, offset.into()),
1284 )
1285}
1286
1287struct PartitionConsumer {
1289 pid: PartitionId,
1291 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1293}
1294
1295impl PartitionConsumer {
1296 fn new(
1298 pid: PartitionId,
1299 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1300 ) -> Self {
1301 PartitionConsumer {
1302 pid,
1303 partition_queue,
1304 }
1305 }
1306
1307 fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1314 match self.partition_queue.poll(Duration::from_millis(0)) {
1315 Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1316 Some(Err(err)) => Err(err),
1317 _ => Ok(None),
1318 }
1319 }
1320
1321 fn pid(&self) -> PartitionId {
1323 self.pid
1324 }
1325}
1326
1327struct GlueConsumerContext {
1330 notificator: Arc<Notify>,
1331 stats_tx: crossbeam_channel::Sender<Jsonb>,
1332 inner: MzClientContext,
1333}
1334
1335impl ClientContext for GlueConsumerContext {
1336 fn stats_raw(&self, statistics: &[u8]) {
1337 match Jsonb::from_slice(statistics) {
1338 Ok(statistics) => {
1339 self.stats_tx
1340 .send(statistics)
1341 .expect("timely operator hung up while Kafka source active");
1342 self.activate();
1343 }
1344 Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1345 };
1346 }
1347
1348 fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1351 self.inner.log(level, fac, log_message)
1352 }
1353 fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1354 self.inner.error(error, reason)
1355 }
1356}
1357
1358impl GlueConsumerContext {
1359 fn activate(&self) {
1360 self.notificator.notify_one();
1361 }
1362}
1363
1364impl ConsumerContext for GlueConsumerContext {}
1365
1366#[cfg(test)]
1367mod tests {
1368 use std::sync::Arc;
1369 use std::time::Duration;
1370
1371 use mz_kafka_util::client::create_new_client_config_simple;
1372 use rdkafka::consumer::{BaseConsumer, Consumer};
1373 use rdkafka::{Message, Offset, TopicPartitionList};
1374 use uuid::Uuid;
1375
1376 #[mz_ore::test]
1388 #[ignore]
1389 fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1390 let topic_name = "queue-test";
1391 let pid = 0;
1392
1393 let mut kafka_config = create_new_client_config_simple();
1394 kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1395 kafka_config.set("enable.auto.commit", "false");
1396 kafka_config.set("group.id", Uuid::new_v4().to_string());
1397 kafka_config.set("fetch.message.max.bytes", "100");
1398 let consumer: BaseConsumer<_> = kafka_config.create()?;
1399
1400 let consumer = Arc::new(consumer);
1401
1402 let mut partition_list = TopicPartitionList::new();
1403 partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1406
1407 consumer.assign(&partition_list)?;
1408
1409 let partition_queue = consumer
1410 .split_partition_queue(topic_name, pid)
1411 .expect("missing partition queue");
1412
1413 let expected_messages = 1_000;
1414
1415 let mut common_queue_count = 0;
1416 let mut partition_queue_count = 0;
1417
1418 loop {
1419 if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1420 match msg {
1421 Ok(msg) => {
1422 let _payload =
1423 std::str::from_utf8(msg.payload().expect("missing payload"))?;
1424 if partition_queue_count > 0 {
1425 anyhow::bail!(
1426 "Got message from common queue after we internally switched to partition queue."
1427 );
1428 }
1429
1430 common_queue_count += 1;
1431 }
1432 Err(err) => anyhow::bail!("{}", err),
1433 }
1434 }
1435
1436 match partition_queue.poll(Duration::from_millis(0)) {
1437 Some(Ok(msg)) => {
1438 let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1439 partition_queue_count += 1;
1440 }
1441 Some(Err(err)) => anyhow::bail!("{}", err),
1442 _ => (),
1443 }
1444
1445 if (common_queue_count + partition_queue_count) == expected_messages {
1446 break;
1447 }
1448 }
1449
1450 assert!(
1451 common_queue_count == 0,
1452 "Got {} out of {} messages from common queue. Partition queue: {}",
1453 common_queue_count,
1454 expected_messages,
1455 partition_queue_count
1456 );
1457
1458 Ok(())
1459 }
1460}
1461
1462fn fetch_partition_info<C: ConsumerContext>(
1464 consumer: &BaseConsumer<C>,
1465 topic: &str,
1466 fetch_timeout: Duration,
1467) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1468 let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1469
1470 let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1471 for pid in pids {
1472 offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1473 }
1474
1475 let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1476
1477 let mut result = BTreeMap::new();
1478 for entry in offset_responses.elements() {
1479 let offset = match entry.offset() {
1480 Offset::Offset(offset) => offset,
1481 offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1482 };
1483
1484 let pid = entry.partition();
1485 let watermark = offset.try_into().expect("invalid negative offset");
1486 result.insert(pid, watermark);
1487 }
1488
1489 Ok(result)
1490}
1491
1492#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1494enum MetadataUpdate {
1495 Partitions(BTreeMap<PartitionId, HighWatermark>),
1497 TransientError(HealthStatus),
1501 DefiniteError(SourceError),
1505}
1506
1507impl MetadataUpdate {
1508 fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1510 match self {
1511 Self::Partitions(partitions) => {
1512 let max_pid = partitions.keys().last().copied();
1513 let lower = max_pid
1514 .map(RangeBound::after)
1515 .unwrap_or(RangeBound::NegInfinity);
1516 let future_ts =
1517 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1518
1519 let mut frontier = Antichain::from_elem(future_ts);
1520 for (pid, high_watermark) in partitions {
1521 frontier.insert(Partitioned::new_singleton(
1522 RangeBound::exact(*pid),
1523 MzOffset::from(*high_watermark),
1524 ));
1525 }
1526
1527 Some(frontier)
1528 }
1529 Self::DefiniteError(_) => Some(Antichain::new()),
1530 Self::TransientError(_) => None,
1531 }
1532 }
1533}
1534
1535#[derive(Debug, thiserror::Error)]
1536pub enum KafkaHeaderParseError {
1537 #[error("A header with key '{key}' was not found in the message headers")]
1538 KeyNotFound { key: String },
1539 #[error(
1540 "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1541 )]
1542 Utf8Error { key: String, raw: Vec<u8> },
1543}
1544
1545fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
1551 scope: &G,
1552 connection: KafkaSourceConnection,
1553 config: RawSourceCreationConfig,
1554) -> (
1555 StreamVec<G, (mz_repr::Timestamp, MetadataUpdate)>,
1556 StreamVec<G, Probe<KafkaTimestamp>>,
1557 PressOnDropButton,
1558) {
1559 let active_worker_id = usize::cast_from(config.id.hashed());
1560 let is_active_worker = active_worker_id % scope.peers() == scope.index();
1561
1562 let resume_upper = Antichain::from_iter(
1563 config
1564 .source_resume_uppers
1565 .values()
1566 .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1567 .flatten(),
1568 );
1569
1570 let name = format!("KafkaMetadataFetcher({})", config.id);
1571 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1572
1573 let (metadata_output, metadata_stream) =
1574 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1575 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1576
1577 let button = builder.build(move |caps| async move {
1578 if !is_active_worker {
1579 return;
1580 }
1581
1582 let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1583
1584 let client_id = connection.client_id(
1585 config.config.config_set(),
1586 &config.config.connection_context,
1587 config.id,
1588 );
1589 let KafkaSourceConnection {
1590 connection,
1591 topic,
1592 topic_metadata_refresh_interval,
1593 ..
1594 } = connection;
1595
1596 let consumer: Result<BaseConsumer<_>, _> = connection
1597 .create_with_context(
1598 &config.config,
1599 MzClientContext::default(),
1600 &btreemap! {
1601 "topic.metadata.refresh.interval.ms" =>
1604 topic_metadata_refresh_interval
1605 .as_millis()
1606 .to_string(),
1607 "client.id" => format!("{client_id}-metadata"),
1610 },
1611 InTask::Yes,
1612 )
1613 .await;
1614
1615 let consumer = match consumer {
1616 Ok(consumer) => consumer,
1617 Err(e) => {
1618 let msg = format!(
1619 "failed creating kafka metadata consumer: {}",
1620 e.display_with_causes()
1621 );
1622 let status_update = HealthStatusUpdate::halting(msg, None);
1623 let status = match e {
1624 ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1625 _ => HealthStatus::kafka(status_update),
1626 };
1627 let error = MetadataUpdate::TransientError(status);
1628 let timestamp = (config.now_fn)().into();
1629 metadata_output.give(&metadata_cap, (timestamp, error));
1630
1631 std::future::pending::<()>().await;
1635 unreachable!("pending future never returns");
1636 }
1637 };
1638
1639 let (tx, mut rx) = mpsc::unbounded_channel();
1640 spawn_metadata_thread(config, consumer, topic, tx);
1641
1642 let mut prev_upstream_frontier = resume_upper;
1643
1644 while let Some((timestamp, mut update)) = rx.recv().await {
1645 if prev_upstream_frontier.is_empty() {
1646 return;
1647 }
1648
1649 if let Some(upstream_frontier) = update.upstream_frontier() {
1650 if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1660 let error = SourceError {
1661 error: SourceErrorDetails::Other("topic was recreated".into()),
1662 };
1663 update = MetadataUpdate::DefiniteError(error);
1664 }
1665 }
1666
1667 if let Some(upstream_frontier) = update.upstream_frontier() {
1668 prev_upstream_frontier = upstream_frontier.clone();
1669
1670 let probe = Probe {
1671 probe_ts: timestamp,
1672 upstream_frontier,
1673 };
1674 probe_output.give(&probe_cap, probe);
1675 }
1676
1677 metadata_output.give(&metadata_cap, (timestamp, update));
1678 }
1679 });
1680
1681 (metadata_stream, probe_stream, button.press_on_drop())
1682}
1683
1684fn spawn_metadata_thread<C: ConsumerContext>(
1685 config: RawSourceCreationConfig,
1686 consumer: BaseConsumer<TunnelingClientContext<C>>,
1687 topic: String,
1688 tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1689) {
1690 thread::Builder::new()
1692 .name(format!("kfk-mtdt-{}", config.id))
1693 .spawn(move || {
1694 trace!(
1695 source_id = config.id.to_string(),
1696 worker_id = config.worker_id,
1697 num_workers = config.worker_count,
1698 "kafka metadata thread: starting..."
1699 );
1700
1701 let timestamp_interval = config.timestamp_interval;
1702 let mut ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
1703
1704 loop {
1705 let probe_ts = ticker.tick_blocking();
1706 let result = fetch_partition_info(
1707 &consumer,
1708 &topic,
1709 config
1710 .config
1711 .parameters
1712 .kafka_timeout_config
1713 .fetch_metadata_timeout,
1714 );
1715 trace!(
1716 source_id = config.id.to_string(),
1717 worker_id = config.worker_id,
1718 num_workers = config.worker_count,
1719 "kafka metadata thread: metadata fetch result: {:?}",
1720 result
1721 );
1722 let update = match result {
1723 Ok(partitions) => {
1724 trace!(
1725 source_id = config.id.to_string(),
1726 worker_id = config.worker_id,
1727 num_workers = config.worker_count,
1728 "kafka metadata thread: fetched partition metadata info",
1729 );
1730
1731 MetadataUpdate::Partitions(partitions)
1732 }
1733 Err(GetPartitionsError::TopicDoesNotExist) => {
1734 let error = SourceError {
1735 error: SourceErrorDetails::Other("topic was deleted".into()),
1736 };
1737 MetadataUpdate::DefiniteError(error)
1738 }
1739 Err(e) => {
1740 let kafka_status = Some(HealthStatusUpdate::stalled(
1741 format!("{}", e.display_with_causes()),
1742 None,
1743 ));
1744
1745 let ssh_status = consumer.client().context().tunnel_status();
1746 let ssh_status = match ssh_status {
1747 SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1748 SshTunnelStatus::Errored(e) => {
1749 Some(HealthStatusUpdate::stalled(e, None))
1750 }
1751 };
1752
1753 MetadataUpdate::TransientError(HealthStatus {
1754 kafka: kafka_status,
1755 ssh: ssh_status,
1756 })
1757 }
1758 };
1759
1760 if tx.send((probe_ts, update)).is_err() {
1761 break;
1762 }
1763 }
1764
1765 info!(
1766 source_id = config.id.to_string(),
1767 worker_id = config.worker_id,
1768 num_workers = config.worker_count,
1769 "kafka metadata thread: receiver has gone away; shutting down."
1770 )
1771 })
1772 .unwrap();
1773}