1use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::convert::Infallible;
13use std::str::{self};
14use std::sync::{Arc, Mutex};
15use std::thread;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use chrono::{DateTime, NaiveDateTime};
20use differential_dataflow::{AsCollection, Hashable};
21use futures::StreamExt;
22use itertools::Itertools;
23use maplit::btreemap;
24use mz_kafka_util::client::{
25 GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
26};
27use mz_ore::assert_none;
28use mz_ore::cast::CastFrom;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
34use mz_ssh_util::tunnel::SshTunnelStatus;
35use mz_storage_types::dyncfgs::KAFKA_METADATA_FETCH_INTERVAL;
36use mz_storage_types::errors::{
37 ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
38};
39use mz_storage_types::sources::kafka::{
40 KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
41};
42use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
43use mz_timely_util::antichain::AntichainExt;
44use mz_timely_util::builder_async::{
45 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::containers::stack::AccountedStackBuilder;
48use mz_timely_util::order::Partitioned;
49use rdkafka::consumer::base_consumer::PartitionQueue;
50use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
51use rdkafka::error::KafkaError;
52use rdkafka::message::{BorrowedMessage, Headers};
53use rdkafka::statistics::Statistics;
54use rdkafka::topic_partition_list::Offset;
55use rdkafka::{ClientContext, Message, TopicPartitionList};
56use serde::{Deserialize, Serialize};
57use timely::PartialOrder;
58use timely::container::CapacityContainerBuilder;
59use timely::dataflow::channels::pact::Pipeline;
60use timely::dataflow::operators::core::Partition;
61use timely::dataflow::operators::{Broadcast, Capability};
62use timely::dataflow::{Scope, Stream};
63use timely::progress::Antichain;
64use timely::progress::Timestamp;
65use tokio::sync::{Notify, mpsc};
66use tracing::{error, info, trace};
67
68use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
69use crate::metrics::source::kafka::KafkaSourceMetrics;
70use crate::source::types::{
71 Probe, ProgressStatisticsUpdate, SignaledFuture, SourceRender, StackedCollection,
72};
73use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
74
75#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
76struct HealthStatus {
77 kafka: Option<HealthStatusUpdate>,
78 ssh: Option<HealthStatusUpdate>,
79}
80
81impl HealthStatus {
82 fn kafka(update: HealthStatusUpdate) -> Self {
83 Self {
84 kafka: Some(update),
85 ssh: None,
86 }
87 }
88
89 fn ssh(update: HealthStatusUpdate) -> Self {
90 Self {
91 kafka: None,
92 ssh: Some(update),
93 }
94 }
95}
96
97pub struct KafkaSourceReader {
99 topic_name: String,
101 source_name: String,
103 id: GlobalId,
105 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
107 partition_consumers: Vec<PartitionConsumer>,
109 worker_id: usize,
111 worker_count: usize,
113 last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
117 start_offsets: BTreeMap<PartitionId, i64>,
119 stats_rx: crossbeam_channel::Receiver<Jsonb>,
121 progress_statistics: Arc<Mutex<PartialProgressStatistics>>,
124 partition_metrics: KafkaSourceMetrics,
126 partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
128}
129
130#[derive(Default)]
133struct PartialProgressStatistics {
134 offset_known: Option<u64>,
135 offset_committed: Option<u64>,
136}
137
138struct PartitionCapability {
139 data: Capability<KafkaTimestamp>,
141 progress: Capability<KafkaTimestamp>,
143}
144
145type HighWatermark = u64;
149
150pub struct KafkaResumeUpperProcessor {
153 config: RawSourceCreationConfig,
154 topic_name: String,
155 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
156 progress_statistics: Arc<Mutex<PartialProgressStatistics>>,
157}
158
159fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
163 let pid = usize::try_from(pid).expect("positive pid");
164 ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
165}
166
167struct SourceOutputInfo {
168 id: GlobalId,
169 output_index: usize,
170 resume_upper: Antichain<KafkaTimestamp>,
171 metadata_columns: Vec<KafkaMetadataKind>,
172}
173
174impl SourceRender for KafkaSourceConnection {
175 type Time = KafkaTimestamp;
180
181 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
182
183 fn render<G: Scope<Timestamp = KafkaTimestamp>>(
184 self,
185 scope: &mut G,
186 config: &RawSourceCreationConfig,
187 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
188 start_signal: impl std::future::Future<Output = ()> + 'static,
189 ) -> (
190 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
191 Stream<G, Infallible>,
192 Stream<G, HealthStatusMessage>,
193 Stream<G, ProgressStatisticsUpdate>,
194 Option<Stream<G, Probe<KafkaTimestamp>>>,
195 Vec<PressOnDropButton>,
196 ) {
197 let (metadata, probes, metadata_token) =
198 render_metadata_fetcher(scope, self.clone(), config.clone());
199 let (data, progress, health, stats, reader_token) = render_reader(
200 scope,
201 self,
202 config.clone(),
203 resume_uppers,
204 metadata,
205 start_signal,
206 );
207
208 let partition_count = u64::cast_from(config.source_exports.len());
209 let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
210 partition_count,
211 |((output, data), time, diff): &(
212 (usize, Result<SourceMessage, DataflowError>),
213 _,
214 Diff,
215 )| {
216 let output = u64::cast_from(*output);
217 (output, (data.clone(), time.clone(), diff.clone()))
218 },
219 );
220 let mut data_collections = BTreeMap::new();
221 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
222 data_collections.insert(*id, data_stream.as_collection());
223 }
224
225 (
226 data_collections,
227 progress,
228 health,
229 stats,
230 Some(probes),
231 vec![metadata_token, reader_token],
232 )
233 }
234}
235
236fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
241 scope: &G,
242 connection: KafkaSourceConnection,
243 config: RawSourceCreationConfig,
244 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
245 metadata_stream: Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
246 start_signal: impl std::future::Future<Output = ()> + 'static,
247) -> (
248 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
249 Stream<G, Infallible>,
250 Stream<G, HealthStatusMessage>,
251 Stream<G, ProgressStatisticsUpdate>,
252 PressOnDropButton,
253) {
254 let name = format!("KafkaReader({})", config.id);
255 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
256
257 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
258 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
259 let (health_output, health_stream) = builder.new_output();
260 let (stats_output, stats_stream) = builder.new_output();
261
262 let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline);
263
264 let mut outputs = vec![];
265 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
266 let SourceExport {
267 details,
268 storage_metadata: _,
269 data_config: _,
270 } = export;
271 let resume_upper = Antichain::from_iter(
272 config
273 .source_resume_uppers
274 .get(id)
275 .expect("all source exports must be present in source resume uppers")
276 .iter()
277 .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
278 );
279
280 let metadata_columns = match details {
281 SourceExportDetails::Kafka(details) => details
282 .metadata_columns
283 .iter()
284 .map(|(_name, kind)| kind.clone())
285 .collect::<Vec<_>>(),
286 SourceExportDetails::None => {
287 continue;
289 }
290 _ => panic!("unexpected source export details: {:?}", details),
291 };
292
293 let output = SourceOutputInfo {
294 id: *id,
295 resume_upper,
296 output_index: idx,
297 metadata_columns,
298 };
299 outputs.push(output);
300 }
301
302 let busy_signal = Arc::clone(&config.busy_signal);
303 let button = builder.build(move |caps| {
304 SignaledFuture::new(busy_signal, async move {
305 let [mut data_cap, mut progress_cap, health_cap, stats_cap] = caps.try_into().unwrap();
306
307 let client_id = connection.client_id(
308 config.config.config_set(),
309 &config.config.connection_context,
310 config.id,
311 );
312 let group_id = connection.group_id(&config.config.connection_context, config.id);
313 let KafkaSourceConnection {
314 connection,
315 topic,
316 topic_metadata_refresh_interval,
317 start_offsets,
318 metadata_columns: _,
319 connection_id: _, group_id_prefix: _, } = connection;
324
325 let mut start_offsets: BTreeMap<_, i64> = start_offsets
327 .clone()
328 .into_iter()
329 .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
330 .map(|(k, v)| (k, v))
331 .collect();
332
333 let mut partition_capabilities = BTreeMap::new();
334 let mut max_pid = None;
335 let resume_upper = Antichain::from_iter(
336 outputs
337 .iter()
338 .map(|output| output.resume_upper.clone())
339 .flatten(),
340 );
341
342 let mut is_snapshotting = &*resume_upper == &[Partitioned::minimum()];
344
345 for ts in resume_upper.elements() {
346 if let Some(pid) = ts.interval().singleton() {
347 let pid = pid.unwrap_exact();
348 max_pid = std::cmp::max(max_pid, Some(*pid));
349 if responsible_for_pid(&config, *pid) {
350 let restored_offset = i64::try_from(ts.timestamp().offset)
351 .expect("restored kafka offsets must fit into i64");
352 if let Some(start_offset) = start_offsets.get_mut(pid) {
353 *start_offset = std::cmp::max(restored_offset, *start_offset);
354 } else {
355 start_offsets.insert(*pid, restored_offset);
356 }
357
358 let part_ts = Partitioned::new_singleton(
359 RangeBound::exact(*pid),
360 ts.timestamp().clone(),
361 );
362 let part_cap = PartitionCapability {
363 data: data_cap.delayed(&part_ts),
364 progress: progress_cap.delayed(&part_ts),
365 };
366 partition_capabilities.insert(*pid, part_cap);
367 }
368 }
369 }
370 let lower = max_pid
371 .map(RangeBound::after)
372 .unwrap_or(RangeBound::NegInfinity);
373 let future_ts =
374 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
375 data_cap.downgrade(&future_ts);
376 progress_cap.downgrade(&future_ts);
377
378 info!(
379 source_id = config.id.to_string(),
380 worker_id = config.worker_id,
381 num_workers = config.worker_count,
382 "instantiating Kafka source reader at offsets {start_offsets:?}"
383 );
384
385 let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
386 let notificator = Arc::new(Notify::new());
387
388 let consumer: Result<BaseConsumer<_>, _> = connection
389 .create_with_context(
390 &config.config,
391 GlueConsumerContext {
392 notificator: Arc::clone(¬ificator),
393 stats_tx,
394 inner: MzClientContext::default(),
395 },
396 &btreemap! {
397 "enable.auto.commit" => "false".into(),
402 "auto.offset.reset" => "earliest".into(),
405 "topic.metadata.refresh.interval.ms" =>
408 topic_metadata_refresh_interval
409 .as_millis()
410 .to_string(),
411 "fetch.message.max.bytes" => "134217728".into(),
413 "group.id" => group_id.clone(),
419 "client.id" => client_id.clone(),
422 },
423 InTask::Yes,
424 )
425 .await;
426
427 let consumer = match consumer {
428 Ok(consumer) => Arc::new(consumer),
429 Err(e) => {
430 let update = HealthStatusUpdate::halting(
431 format!(
432 "failed creating kafka reader consumer: {}",
433 e.display_with_causes()
434 ),
435 None,
436 );
437 for (output, update) in outputs.iter().repeat_clone(update) {
438 health_output.give(
439 &health_cap,
440 HealthStatusMessage {
441 id: Some(output.id),
442 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
443 StatusNamespace::Ssh
444 } else {
445 StatusNamespace::Kafka
446 },
447 update,
448 },
449 );
450 }
451 std::future::pending::<()>().await;
455 unreachable!("pending future never returns");
456 }
457 };
458
459 start_signal.await;
463 info!(
464 source_id = config.id.to_string(),
465 worker_id = config.worker_id,
466 num_workers = config.worker_count,
467 "kafka worker noticed rehydration is finished, starting partition queues..."
468 );
469
470 let partition_ids = start_offsets.keys().copied().collect();
471 let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
472
473 let mut reader = KafkaSourceReader {
474 topic_name: topic.clone(),
475 source_name: config.name.clone(),
476 id: config.id,
477 partition_consumers: Vec::new(),
478 consumer: Arc::clone(&consumer),
479 worker_id: config.worker_id,
480 worker_count: config.worker_count,
481 last_offsets: outputs
482 .iter()
483 .map(|output| (output.output_index, BTreeMap::new()))
484 .collect(),
485 start_offsets,
486 stats_rx,
487 progress_statistics: Default::default(),
488 partition_metrics: config.metrics.get_kafka_source_metrics(
489 partition_ids,
490 topic.clone(),
491 config.id,
492 ),
493 partition_capabilities,
494 };
495
496 let offset_committer = KafkaResumeUpperProcessor {
497 config: config.clone(),
498 topic_name: topic.clone(),
499 consumer,
500 progress_statistics: Arc::clone(&reader.progress_statistics),
501 };
502
503 if is_snapshotting {
505 if let Err(e) = offset_committer
506 .process_frontier(resume_upper.clone())
507 .await
508 {
509 offset_commit_metrics.offset_commit_failures.inc();
510 tracing::warn!(
511 %e,
512 "timely-{} source({}) failed to commit offsets: resume_upper={}",
513 config.id,
514 config.worker_id,
515 resume_upper.pretty()
516 );
517 }
518 }
519
520 let resume_uppers_process_loop = async move {
521 tokio::pin!(resume_uppers);
522 while let Some(frontier) = resume_uppers.next().await {
523 if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
524 offset_commit_metrics.offset_commit_failures.inc();
525 tracing::warn!(
526 %e,
527 "timely-{} source({}) failed to commit offsets: resume_upper={}",
528 config.id,
529 config.worker_id,
530 frontier.pretty()
531 );
532 }
533 }
534 std::future::pending::<()>().await;
539 };
540 tokio::pin!(resume_uppers_process_loop);
541
542 let mut prev_offset_known = None;
543 let mut prev_offset_committed = None;
544 let mut metadata_update: Option<MetadataUpdate> = None;
545 let mut snapshot_total = None;
546
547 let max_wait_time =
548 mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
549 loop {
550 tokio::select! {
553 _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
556
557 _ = metadata_input.ready() => {
558 let mut updates = Vec::new();
560 while let Some(event) = metadata_input.next_sync() {
561 if let Event::Data(_, mut data) = event {
562 updates.append(&mut data);
563 }
564 }
565 metadata_update = updates
566 .into_iter()
567 .max_by_key(|(ts, _)| *ts)
568 .map(|(_, update)| update);
569 }
570
571 _ = resume_uppers_process_loop.as_mut() => {},
575 }
576
577 match metadata_update.take() {
578 Some(MetadataUpdate::Partitions(partitions)) => {
579 let max_pid = partitions.keys().last().cloned();
580 let lower = max_pid
581 .map(RangeBound::after)
582 .unwrap_or(RangeBound::NegInfinity);
583 let future_ts = Partitioned::new_range(
584 lower,
585 RangeBound::PosInfinity,
586 MzOffset::from(0),
587 );
588
589 let mut upstream_stat = 0;
590 for (&pid, &high_watermark) in &partitions {
591 if responsible_for_pid(&config, pid) {
592 upstream_stat += high_watermark;
593 reader.ensure_partition(pid);
594 if let Entry::Vacant(entry) =
595 reader.partition_capabilities.entry(pid)
596 {
597 let start_offset = match reader.start_offsets.get(&pid) {
598 Some(&offset) => offset.try_into().unwrap(),
599 None => 0u64,
600 };
601 let part_since_ts = Partitioned::new_singleton(
602 RangeBound::exact(pid),
603 MzOffset::from(start_offset),
604 );
605 let part_upper_ts = Partitioned::new_singleton(
606 RangeBound::exact(pid),
607 MzOffset::from(high_watermark),
608 );
609
610 entry.insert(PartitionCapability {
620 data: data_cap.delayed(&part_since_ts),
621 progress: progress_cap.delayed(&part_upper_ts),
622 });
623 }
624 }
625 }
626
627 if is_snapshotting && snapshot_total.is_none() {
630 snapshot_total = Some(upstream_stat);
634 }
635
636 for output in &outputs {
640 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
641 health_output.give(
642 &health_cap,
643 HealthStatusMessage {
644 id: Some(output.id),
645 namespace,
646 update: HealthStatusUpdate::running(),
647 },
648 );
649 }
650 }
651
652 let mut progress_statistics =
653 reader.progress_statistics.lock().expect("poisoned");
654 progress_statistics.offset_known = Some(upstream_stat);
655 data_cap.downgrade(&future_ts);
656 progress_cap.downgrade(&future_ts);
657 }
658 Some(MetadataUpdate::TransientError(status)) => {
659 if let Some(update) = status.kafka {
660 for (output, update) in outputs.iter().repeat_clone(update) {
661 health_output.give(
662 &health_cap,
663 HealthStatusMessage {
664 id: Some(output.id),
665 namespace: StatusNamespace::Kafka,
666 update,
667 },
668 );
669 }
670 }
671 if let Some(update) = status.ssh {
672 for (output, update) in outputs.iter().repeat_clone(update) {
673 health_output.give(
674 &health_cap,
675 HealthStatusMessage {
676 id: Some(output.id),
677 namespace: StatusNamespace::Ssh,
678 update,
679 },
680 );
681 }
682 }
683 }
684 Some(MetadataUpdate::DefiniteError(error)) => {
685 let error = Err(error.into());
686 let time = data_cap.time().clone();
687 for (output, error) in
688 outputs.iter().map(|o| o.output_index).repeat_clone(error)
689 {
690 data_output
691 .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
692 .await;
693 }
694
695 return;
696 }
697 None => {}
698 }
699
700 while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
708 match result {
709 Err(e) => {
710 let error = format!(
711 "kafka error when polling consumer for source: {} topic: {} : {}",
712 reader.source_name, reader.topic_name, e
713 );
714 let status = HealthStatusUpdate::stalled(error, None);
715 for (output, status) in outputs.iter().repeat_clone(status) {
716 health_output.give(
717 &health_cap,
718 HealthStatusMessage {
719 id: Some(output.id),
720 namespace: StatusNamespace::Kafka,
721 update: status,
722 },
723 );
724 }
725 }
726 Ok(message) => {
727 let output_messages = outputs
728 .iter()
729 .map(|output| {
730 let (message, ts) = construct_source_message(
731 &message,
732 &output.metadata_columns,
733 );
734 (output.output_index, message, ts)
735 })
736 .collect::<Vec<_>>();
740 for (output_index, message, ts) in output_messages {
741 if let Some((msg, time, diff)) =
742 reader.handle_message(message, ts, &output_index)
743 {
744 let pid = time.interval().singleton().unwrap().unwrap_exact();
745 let part_cap = &reader.partition_capabilities[pid].data;
746 let msg = msg.map_err(|e| {
747 DataflowError::SourceError(Box::new(SourceError {
748 error: SourceErrorDetails::Other(e.to_string().into()),
749 }))
750 });
751 data_output
752 .give_fueled(part_cap, ((output_index, msg), time, diff))
753 .await;
754 }
755 }
756 }
757 }
758 }
759
760 reader.update_stats();
761
762 let mut consumers = std::mem::take(&mut reader.partition_consumers);
764 for consumer in consumers.iter_mut() {
765 let pid = consumer.pid();
766 let mut partition_exhausted = false;
772 for _ in 0..10_000 {
773 let Some(message) = consumer.get_next_message().transpose() else {
774 partition_exhausted = true;
775 break;
776 };
777
778 for output in outputs.iter() {
779 let message = match &message {
780 Ok((msg, pid)) => {
781 let (msg, ts) =
782 construct_source_message(msg, &output.metadata_columns);
783 assert_eq!(*pid, ts.0);
784 Ok(reader.handle_message(msg, ts, &output.output_index))
785 }
786 Err(err) => Err(err),
787 };
788 match message {
789 Ok(Some((msg, time, diff))) => {
790 let pid = time.interval().singleton().unwrap().unwrap_exact();
791 let part_cap = &reader.partition_capabilities[pid].data;
792 let msg = msg.map_err(|e| {
793 DataflowError::SourceError(Box::new(SourceError {
794 error: SourceErrorDetails::Other(e.to_string().into()),
795 }))
796 });
797 data_output
798 .give_fueled(
799 part_cap,
800 ((output.output_index, msg), time, diff),
801 )
802 .await;
803 }
804 Ok(None) => continue,
806 Err(err) => {
807 let last_offset = reader
808 .last_offsets
809 .get(&output.output_index)
810 .expect("output known to be installed")
811 .get(&pid)
812 .expect("partition known to be installed");
813
814 let status = HealthStatusUpdate::stalled(
815 format!(
816 "error consuming from source: {} topic: {topic}:\
817 partition: {pid} last processed offset:\
818 {last_offset} : {err}",
819 config.name
820 ),
821 None,
822 );
823 health_output.give(
824 &health_cap,
825 HealthStatusMessage {
826 id: Some(output.id),
827 namespace: StatusNamespace::Kafka,
828 update: status,
829 },
830 );
831 }
832 }
833 }
834 }
835 if !partition_exhausted {
836 notificator.notify_one();
837 }
838 }
839 assert!(reader.partition_consumers.is_empty());
841 reader.partition_consumers = consumers;
842
843 let positions = reader.consumer.position().unwrap();
844 let topic_positions = positions.elements_for_topic(&reader.topic_name);
845 let mut snapshot_staged = 0;
846
847 for position in topic_positions {
848 if let Offset::Offset(offset) = position.offset() {
851 let pid = position.partition();
852 let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
853 let upper =
854 Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
855
856 let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
857 match part_cap.data.try_downgrade(&upper) {
858 Ok(()) => {
859 if is_snapshotting {
860 snapshot_staged += offset.try_into().unwrap_or(0u64);
863 if let Some(snapshot_total) = snapshot_total {
865 snapshot_staged =
868 std::cmp::min(snapshot_staged, snapshot_total);
869 }
870 }
871 }
872 Err(_) => {
873 info!(
876 source_id = config.id.to_string(),
877 worker_id = config.worker_id,
878 num_workers = config.worker_count,
879 "kafka source frontier downgrade skipped due to already \
880 seen offset: {:?}",
881 upper
882 );
883 }
884 };
885
886 let _ = part_cap.progress.try_downgrade(&upper);
891 }
892 }
893
894 let mut stats =
898 { std::mem::take(&mut *reader.progress_statistics.lock().expect("poisoned")) };
899
900 let offset_committed = stats.offset_committed.take().or(prev_offset_committed);
901 let offset_known = stats.offset_known.take().or(prev_offset_known);
902 if let Some((offset_known, offset_committed)) = offset_known.zip(offset_committed) {
903 stats_output.give(
904 &stats_cap,
905 ProgressStatisticsUpdate::SteadyState {
906 offset_committed,
907 offset_known,
908 },
909 );
910 }
911 prev_offset_committed = offset_committed;
912 prev_offset_known = offset_known;
913
914 if let (Some(snapshot_total), true) = (snapshot_total, is_snapshotting) {
915 stats_output.give(
916 &stats_cap,
917 ProgressStatisticsUpdate::Snapshot {
918 records_known: snapshot_total,
919 records_staged: snapshot_staged,
920 },
921 );
922
923 if snapshot_total == snapshot_staged {
924 is_snapshotting = false;
925 }
926 }
927 }
928 })
929 });
930
931 (
932 stream.as_collection(),
933 progress_stream,
934 health_stream,
935 stats_stream,
936 button.press_on_drop(),
937 )
938}
939
940impl KafkaResumeUpperProcessor {
941 async fn process_frontier(
942 &self,
943 frontier: Antichain<KafkaTimestamp>,
944 ) -> Result<(), anyhow::Error> {
945 use rdkafka::consumer::CommitMode;
946
947 let mut offsets = vec![];
949 let mut progress_stat = 0;
950 for ts in frontier.iter() {
951 if let Some(pid) = ts.interval().singleton() {
952 let pid = pid.unwrap_exact();
953 if responsible_for_pid(&self.config, *pid) {
954 offsets.push((pid.clone(), *ts.timestamp()));
955
956 progress_stat += ts.timestamp().offset;
961 }
962 }
963 }
964 self.progress_statistics
965 .lock()
966 .expect("poisoned")
967 .offset_committed = Some(progress_stat);
968
969 if !offsets.is_empty() {
970 let mut tpl = TopicPartitionList::new();
971 for (pid, offset) in offsets {
972 let offset_to_commit =
973 Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
974 tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
975 .expect("offset known to be valid");
976 }
977 let consumer = Arc::clone(&self.consumer);
978 mz_ore::task::spawn_blocking(
979 || format!("source({}) kafka offset commit", self.config.id),
980 move || consumer.commit(&tpl, CommitMode::Sync),
981 )
982 .await??;
983 }
984 Ok(())
985 }
986}
987
988impl KafkaSourceReader {
989 fn ensure_partition(&mut self, pid: PartitionId) {
991 if self.last_offsets.is_empty() {
992 tracing::info!(
993 source_id = %self.id,
994 worker_id = %self.worker_id,
995 "kafka source does not have any outputs, not creating partition queue");
996
997 return;
998 }
999 for last_offsets in self.last_offsets.values() {
1000 if last_offsets.contains_key(&pid) {
1002 return;
1003 }
1004 }
1005
1006 let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1007 self.create_partition_queue(pid, Offset::Offset(start_offset));
1008
1009 for last_offsets in self.last_offsets.values_mut() {
1010 let prev = last_offsets.insert(pid, start_offset - 1);
1011 assert_none!(prev);
1012 }
1013 }
1014
1015 fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1017 info!(
1018 source_id = self.id.to_string(),
1019 worker_id = self.worker_id,
1020 num_workers = self.worker_count,
1021 "activating Kafka queue for topic {}, partition {}",
1022 self.topic_name,
1023 partition_id,
1024 );
1025
1026 let tpl = self.consumer.assignment().unwrap();
1028 let mut partition_list = TopicPartitionList::new();
1030 for partition in tpl.elements_for_topic(&self.topic_name) {
1031 partition_list
1032 .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1033 .expect("offset known to be valid");
1034 }
1035 partition_list
1037 .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1038 .expect("offset known to be valid");
1039 self.consumer
1040 .assign(&partition_list)
1041 .expect("assignment known to be valid");
1042
1043 let context = Arc::clone(self.consumer.context());
1046 for pc in &mut self.partition_consumers {
1047 pc.partition_queue = self
1048 .consumer
1049 .split_partition_queue(&self.topic_name, pc.pid)
1050 .expect("partition known to be valid");
1051 pc.partition_queue.set_nonempty_callback({
1052 let context = Arc::clone(&context);
1053 move || context.inner().activate()
1054 });
1055 }
1056
1057 let mut partition_queue = self
1058 .consumer
1059 .split_partition_queue(&self.topic_name, partition_id)
1060 .expect("partition known to be valid");
1061 partition_queue.set_nonempty_callback(move || context.inner().activate());
1062 self.partition_consumers
1063 .push(PartitionConsumer::new(partition_id, partition_queue));
1064 assert_eq!(
1065 self.consumer
1066 .assignment()
1067 .unwrap()
1068 .elements_for_topic(&self.topic_name)
1069 .len(),
1070 self.partition_consumers.len()
1071 );
1072 }
1073
1074 fn update_stats(&mut self) {
1076 while let Ok(stats) = self.stats_rx.try_recv() {
1077 match serde_json::from_str::<Statistics>(&stats.to_string()) {
1078 Ok(statistics) => {
1079 let topic = statistics.topics.get(&self.topic_name);
1080 match topic {
1081 Some(topic) => {
1082 for (id, partition) in &topic.partitions {
1083 self.partition_metrics
1084 .set_offset_max(*id, partition.hi_offset);
1085 }
1086 }
1087 None => error!("No stats found for topic: {}", &self.topic_name),
1088 }
1089 }
1090 Err(e) => {
1091 error!("failed decoding librdkafka statistics JSON: {}", e);
1092 }
1093 }
1094 }
1095 }
1096
1097 fn handle_message(
1100 &mut self,
1101 message: Result<SourceMessage, KafkaHeaderParseError>,
1102 (partition, offset): (PartitionId, MzOffset),
1103 output_index: &usize,
1104 ) -> Option<(
1105 Result<SourceMessage, KafkaHeaderParseError>,
1106 KafkaTimestamp,
1107 Diff,
1108 )> {
1109 assert!(
1121 self.last_offsets
1122 .get(output_index)
1123 .unwrap()
1124 .contains_key(&partition)
1125 );
1126
1127 let last_offset_ref = self
1128 .last_offsets
1129 .get_mut(output_index)
1130 .expect("output known to be installed")
1131 .get_mut(&partition)
1132 .expect("partition known to be installed");
1133
1134 let last_offset = *last_offset_ref;
1135 let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1136 if offset_as_i64 <= last_offset {
1137 info!(
1138 source_id = self.id.to_string(),
1139 worker_id = self.worker_id,
1140 num_workers = self.worker_count,
1141 "kafka message before expected offset: \
1142 source {} (reading topic {}, partition {}, output {}) \
1143 received offset {} expected offset {:?}",
1144 self.source_name,
1145 self.topic_name,
1146 partition,
1147 output_index,
1148 offset.offset,
1149 last_offset + 1,
1150 );
1151 None
1153 } else {
1154 *last_offset_ref = offset_as_i64;
1155
1156 let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1157 Some((message, ts, Diff::ONE))
1158 }
1159 }
1160}
1161
1162fn construct_source_message(
1163 msg: &BorrowedMessage<'_>,
1164 metadata_columns: &[KafkaMetadataKind],
1165) -> (
1166 Result<SourceMessage, KafkaHeaderParseError>,
1167 (PartitionId, MzOffset),
1168) {
1169 let pid = msg.partition();
1170 let Ok(offset) = u64::try_from(msg.offset()) else {
1171 panic!(
1172 "got negative offset ({}) from otherwise non-error'd kafka message",
1173 msg.offset()
1174 );
1175 };
1176
1177 let mut metadata = Row::default();
1178 let mut packer = metadata.packer();
1179 for kind in metadata_columns {
1180 match kind {
1181 KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1182 KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1183 KafkaMetadataKind::Timestamp => {
1184 let ts = msg
1185 .timestamp()
1186 .to_millis()
1187 .expect("kafka sources always have upstream_time");
1188
1189 let d: Datum = DateTime::from_timestamp_millis(ts)
1190 .and_then(|dt| {
1191 let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1192 dt.naive_utc().try_into().ok();
1193 ct
1194 })
1195 .into();
1196 packer.push(d)
1197 }
1198 KafkaMetadataKind::Header { key, use_bytes } => {
1199 match msg.headers() {
1200 Some(headers) => {
1201 let d = headers
1202 .iter()
1203 .filter(|header| header.key == key)
1204 .last()
1205 .map(|header| match header.value {
1206 Some(v) => {
1207 if *use_bytes {
1208 Ok(Datum::Bytes(v))
1209 } else {
1210 match str::from_utf8(v) {
1211 Ok(str) => Ok(Datum::String(str)),
1212 Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1213 key: key.clone(),
1214 raw: v.to_vec(),
1215 }),
1216 }
1217 }
1218 }
1219 None => Ok(Datum::Null),
1220 })
1221 .unwrap_or(Err(KafkaHeaderParseError::KeyNotFound {
1222 key: key.clone(),
1223 }));
1224 match d {
1225 Ok(d) => packer.push(d),
1226 Err(err) => return (Err(err), (pid, offset.into())),
1228 }
1229 }
1230 None => packer.push(Datum::Null),
1231 }
1232 }
1233 KafkaMetadataKind::Headers => {
1234 packer.push_list_with(|r| {
1235 if let Some(headers) = msg.headers() {
1236 for header in headers.iter() {
1237 match header.value {
1238 Some(v) => r.push_list_with(|record_row| {
1239 record_row.push(Datum::String(header.key));
1240 record_row.push(Datum::Bytes(v));
1241 }),
1242 None => r.push_list_with(|record_row| {
1243 record_row.push(Datum::String(header.key));
1244 record_row.push(Datum::Null);
1245 }),
1246 }
1247 }
1248 }
1249 });
1250 }
1251 }
1252 }
1253
1254 let key = match msg.key() {
1255 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1256 None => Row::pack([Datum::Null]),
1257 };
1258 let value = match msg.payload() {
1259 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1260 None => Row::pack([Datum::Null]),
1261 };
1262 (
1263 Ok(SourceMessage {
1264 key,
1265 value,
1266 metadata,
1267 }),
1268 (pid, offset.into()),
1269 )
1270}
1271
1272struct PartitionConsumer {
1274 pid: PartitionId,
1276 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1278}
1279
1280impl PartitionConsumer {
1281 fn new(
1283 pid: PartitionId,
1284 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1285 ) -> Self {
1286 PartitionConsumer {
1287 pid,
1288 partition_queue,
1289 }
1290 }
1291
1292 fn get_next_message(&self) -> Result<Option<(BorrowedMessage, PartitionId)>, KafkaError> {
1299 match self.partition_queue.poll(Duration::from_millis(0)) {
1300 Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1301 Some(Err(err)) => Err(err),
1302 _ => Ok(None),
1303 }
1304 }
1305
1306 fn pid(&self) -> PartitionId {
1308 self.pid
1309 }
1310}
1311
1312struct GlueConsumerContext {
1315 notificator: Arc<Notify>,
1316 stats_tx: crossbeam_channel::Sender<Jsonb>,
1317 inner: MzClientContext,
1318}
1319
1320impl ClientContext for GlueConsumerContext {
1321 fn stats_raw(&self, statistics: &[u8]) {
1322 match Jsonb::from_slice(statistics) {
1323 Ok(statistics) => {
1324 self.stats_tx
1325 .send(statistics)
1326 .expect("timely operator hung up while Kafka source active");
1327 self.activate();
1328 }
1329 Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1330 };
1331 }
1332
1333 fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1336 self.inner.log(level, fac, log_message)
1337 }
1338 fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1339 self.inner.error(error, reason)
1340 }
1341}
1342
1343impl GlueConsumerContext {
1344 fn activate(&self) {
1345 self.notificator.notify_one();
1346 }
1347}
1348
1349impl ConsumerContext for GlueConsumerContext {}
1350
1351#[cfg(test)]
1352mod tests {
1353 use std::sync::Arc;
1354 use std::time::Duration;
1355
1356 use mz_kafka_util::client::create_new_client_config_simple;
1357 use rdkafka::consumer::{BaseConsumer, Consumer};
1358 use rdkafka::{Message, Offset, TopicPartitionList};
1359 use uuid::Uuid;
1360
1361 #[mz_ore::test]
1373 #[ignore]
1374 fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1375 let topic_name = "queue-test";
1376 let pid = 0;
1377
1378 let mut kafka_config = create_new_client_config_simple();
1379 kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1380 kafka_config.set("enable.auto.commit", "false");
1381 kafka_config.set("group.id", Uuid::new_v4().to_string());
1382 kafka_config.set("fetch.message.max.bytes", "100");
1383 let consumer: BaseConsumer<_> = kafka_config.create()?;
1384
1385 let consumer = Arc::new(consumer);
1386
1387 let mut partition_list = TopicPartitionList::new();
1388 partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1391
1392 consumer.assign(&partition_list)?;
1393
1394 let partition_queue = consumer
1395 .split_partition_queue(topic_name, pid)
1396 .expect("missing partition queue");
1397
1398 let expected_messages = 1_000;
1399
1400 let mut common_queue_count = 0;
1401 let mut partition_queue_count = 0;
1402
1403 loop {
1404 if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1405 match msg {
1406 Ok(msg) => {
1407 let _payload =
1408 std::str::from_utf8(msg.payload().expect("missing payload"))?;
1409 if partition_queue_count > 0 {
1410 anyhow::bail!(
1411 "Got message from common queue after we internally switched to partition queue."
1412 );
1413 }
1414
1415 common_queue_count += 1;
1416 }
1417 Err(err) => anyhow::bail!("{}", err),
1418 }
1419 }
1420
1421 match partition_queue.poll(Duration::from_millis(0)) {
1422 Some(Ok(msg)) => {
1423 let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1424 partition_queue_count += 1;
1425 }
1426 Some(Err(err)) => anyhow::bail!("{}", err),
1427 _ => (),
1428 }
1429
1430 if (common_queue_count + partition_queue_count) == expected_messages {
1431 break;
1432 }
1433 }
1434
1435 assert!(
1436 common_queue_count == 0,
1437 "Got {} out of {} messages from common queue. Partition queue: {}",
1438 common_queue_count,
1439 expected_messages,
1440 partition_queue_count
1441 );
1442
1443 Ok(())
1444 }
1445}
1446
1447fn fetch_partition_info<C: ConsumerContext>(
1449 consumer: &BaseConsumer<C>,
1450 topic: &str,
1451 fetch_timeout: Duration,
1452) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1453 let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1454
1455 let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1456 for pid in pids {
1457 offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1458 }
1459
1460 let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1461
1462 let mut result = BTreeMap::new();
1463 for entry in offset_responses.elements() {
1464 let offset = match entry.offset() {
1465 Offset::Offset(offset) => offset,
1466 offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1467 };
1468
1469 let pid = entry.partition();
1470 let watermark = offset.try_into().expect("invalid negative offset");
1471 result.insert(pid, watermark);
1472 }
1473
1474 Ok(result)
1475}
1476
1477#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1479enum MetadataUpdate {
1480 Partitions(BTreeMap<PartitionId, HighWatermark>),
1482 TransientError(HealthStatus),
1486 DefiniteError(SourceError),
1490}
1491
1492impl MetadataUpdate {
1493 fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1495 match self {
1496 Self::Partitions(partitions) => {
1497 let max_pid = partitions.keys().last().copied();
1498 let lower = max_pid
1499 .map(RangeBound::after)
1500 .unwrap_or(RangeBound::NegInfinity);
1501 let future_ts =
1502 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1503
1504 let mut frontier = Antichain::from_elem(future_ts);
1505 for (pid, high_watermark) in partitions {
1506 frontier.insert(Partitioned::new_singleton(
1507 RangeBound::exact(*pid),
1508 MzOffset::from(*high_watermark),
1509 ));
1510 }
1511
1512 Some(frontier)
1513 }
1514 Self::DefiniteError(_) => Some(Antichain::new()),
1515 Self::TransientError(_) => None,
1516 }
1517 }
1518}
1519
1520#[derive(Debug, thiserror::Error)]
1521pub enum KafkaHeaderParseError {
1522 #[error("A header with key '{key}' was not found in the message headers")]
1523 KeyNotFound { key: String },
1524 #[error(
1525 "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1526 )]
1527 Utf8Error { key: String, raw: Vec<u8> },
1528}
1529
1530fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
1536 scope: &G,
1537 connection: KafkaSourceConnection,
1538 config: RawSourceCreationConfig,
1539) -> (
1540 Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
1541 Stream<G, Probe<KafkaTimestamp>>,
1542 PressOnDropButton,
1543) {
1544 let active_worker_id = usize::cast_from(config.id.hashed());
1545 let is_active_worker = active_worker_id % scope.peers() == scope.index();
1546
1547 let resume_upper = Antichain::from_iter(
1548 config
1549 .source_resume_uppers
1550 .values()
1551 .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1552 .flatten(),
1553 );
1554
1555 let name = format!("KafkaMetadataFetcher({})", config.id);
1556 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1557
1558 let (metadata_output, metadata_stream) = builder.new_output();
1559 let (probe_output, probe_stream) = builder.new_output();
1560
1561 let button = builder.build(move |caps| async move {
1562 if !is_active_worker {
1563 return;
1564 }
1565
1566 let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1567
1568 let client_id = connection.client_id(
1569 config.config.config_set(),
1570 &config.config.connection_context,
1571 config.id,
1572 );
1573 let KafkaSourceConnection {
1574 connection,
1575 topic,
1576 topic_metadata_refresh_interval,
1577 ..
1578 } = connection;
1579
1580 let consumer: Result<BaseConsumer<_>, _> = connection
1581 .create_with_context(
1582 &config.config,
1583 MzClientContext::default(),
1584 &btreemap! {
1585 "topic.metadata.refresh.interval.ms" =>
1588 topic_metadata_refresh_interval
1589 .as_millis()
1590 .to_string(),
1591 "client.id" => format!("{client_id}-metadata"),
1594 },
1595 InTask::Yes,
1596 )
1597 .await;
1598
1599 let consumer = match consumer {
1600 Ok(consumer) => consumer,
1601 Err(e) => {
1602 let msg = format!(
1603 "failed creating kafka metadata consumer: {}",
1604 e.display_with_causes()
1605 );
1606 let status_update = HealthStatusUpdate::halting(msg, None);
1607 let status = match e {
1608 ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1609 _ => HealthStatus::kafka(status_update),
1610 };
1611 let error = MetadataUpdate::TransientError(status);
1612 let timestamp = (config.now_fn)().into();
1613 metadata_output.give(&metadata_cap, (timestamp, error));
1614
1615 std::future::pending::<()>().await;
1619 unreachable!("pending future never returns");
1620 }
1621 };
1622
1623 let (tx, mut rx) = mpsc::unbounded_channel();
1624 spawn_metadata_thread(config, consumer, topic, tx);
1625
1626 let mut prev_upstream_frontier = resume_upper;
1627
1628 while let Some((timestamp, mut update)) = rx.recv().await {
1629 if prev_upstream_frontier.is_empty() {
1630 return;
1631 }
1632
1633 if let Some(upstream_frontier) = update.upstream_frontier() {
1634 if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1644 let error = SourceError {
1645 error: SourceErrorDetails::Other("topic was recreated".into()),
1646 };
1647 update = MetadataUpdate::DefiniteError(error);
1648 }
1649 }
1650
1651 if let Some(upstream_frontier) = update.upstream_frontier() {
1652 prev_upstream_frontier = upstream_frontier.clone();
1653
1654 let probe = Probe {
1655 probe_ts: timestamp,
1656 upstream_frontier,
1657 };
1658 probe_output.give(&probe_cap, probe);
1659 }
1660
1661 metadata_output.give(&metadata_cap, (timestamp, update));
1662 }
1663 });
1664
1665 (metadata_stream, probe_stream, button.press_on_drop())
1666}
1667
1668fn spawn_metadata_thread<C: ConsumerContext>(
1669 config: RawSourceCreationConfig,
1670 consumer: BaseConsumer<TunnelingClientContext<C>>,
1671 topic: String,
1672 tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1673) {
1674 thread::Builder::new()
1676 .name(format!("kfk-mtdt-{}", config.id))
1677 .spawn(move || {
1678 trace!(
1679 source_id = config.id.to_string(),
1680 worker_id = config.worker_id,
1681 num_workers = config.worker_count,
1682 "kafka metadata thread: starting..."
1683 );
1684
1685 let mut ticker = probe::Ticker::new(
1686 || KAFKA_METADATA_FETCH_INTERVAL.get(config.config.config_set()),
1687 config.now_fn,
1688 );
1689
1690 loop {
1691 let probe_ts = ticker.tick_blocking();
1692 let result = fetch_partition_info(
1693 &consumer,
1694 &topic,
1695 config
1696 .config
1697 .parameters
1698 .kafka_timeout_config
1699 .fetch_metadata_timeout,
1700 );
1701 trace!(
1702 source_id = config.id.to_string(),
1703 worker_id = config.worker_id,
1704 num_workers = config.worker_count,
1705 "kafka metadata thread: metadata fetch result: {:?}",
1706 result
1707 );
1708 let update = match result {
1709 Ok(partitions) => {
1710 trace!(
1711 source_id = config.id.to_string(),
1712 worker_id = config.worker_id,
1713 num_workers = config.worker_count,
1714 "kafka metadata thread: fetched partition metadata info",
1715 );
1716
1717 MetadataUpdate::Partitions(partitions)
1718 }
1719 Err(GetPartitionsError::TopicDoesNotExist) => {
1720 let error = SourceError {
1721 error: SourceErrorDetails::Other("topic was deleted".into()),
1722 };
1723 MetadataUpdate::DefiniteError(error)
1724 }
1725 Err(e) => {
1726 let kafka_status = Some(HealthStatusUpdate::stalled(
1727 format!("{}", e.display_with_causes()),
1728 None,
1729 ));
1730
1731 let ssh_status = consumer.client().context().tunnel_status();
1732 let ssh_status = match ssh_status {
1733 SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1734 SshTunnelStatus::Errored(e) => {
1735 Some(HealthStatusUpdate::stalled(e, None))
1736 }
1737 };
1738
1739 MetadataUpdate::TransientError(HealthStatus {
1740 kafka: kafka_status,
1741 ssh: ssh_status,
1742 })
1743 }
1744 };
1745
1746 if tx.send((probe_ts, update)).is_err() {
1747 break;
1748 }
1749 }
1750
1751 info!(
1752 source_id = config.id.to_string(),
1753 worker_id = config.worker_id,
1754 num_workers = config.worker_count,
1755 "kafka metadata thread: receiver has gone away; shutting down."
1756 )
1757 })
1758 .unwrap();
1759}