1use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::convert::Infallible;
13use std::str::{self};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use chrono::{DateTime, NaiveDateTime};
20use differential_dataflow::{AsCollection, Hashable};
21use futures::StreamExt;
22use itertools::Itertools;
23use maplit::btreemap;
24use mz_kafka_util::client::{
25 GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
26};
27use mz_ore::assert_none;
28use mz_ore::cast::CastFrom;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
34use mz_ssh_util::tunnel::SshTunnelStatus;
35use mz_storage_types::dyncfgs::KAFKA_METADATA_FETCH_INTERVAL;
36use mz_storage_types::errors::{
37 ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
38};
39use mz_storage_types::sources::kafka::{
40 KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
41};
42use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
43use mz_timely_util::antichain::AntichainExt;
44use mz_timely_util::builder_async::{
45 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::containers::stack::AccountedStackBuilder;
48use mz_timely_util::order::Partitioned;
49use rdkafka::consumer::base_consumer::PartitionQueue;
50use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
51use rdkafka::error::KafkaError;
52use rdkafka::message::{BorrowedMessage, Headers};
53use rdkafka::statistics::Statistics;
54use rdkafka::topic_partition_list::Offset;
55use rdkafka::{ClientContext, Message, TopicPartitionList};
56use serde::{Deserialize, Serialize};
57use timely::PartialOrder;
58use timely::container::CapacityContainerBuilder;
59use timely::dataflow::channels::pact::Pipeline;
60use timely::dataflow::operators::core::Partition;
61use timely::dataflow::operators::{Broadcast, Capability};
62use timely::dataflow::{Scope, Stream};
63use timely::progress::Antichain;
64use timely::progress::Timestamp;
65use tokio::sync::{Notify, mpsc};
66use tracing::{error, info, trace};
67
68use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
69use crate::metrics::source::kafka::KafkaSourceMetrics;
70use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
71use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
72use crate::statistics::SourceStatistics;
73
74#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
75struct HealthStatus {
76 kafka: Option<HealthStatusUpdate>,
77 ssh: Option<HealthStatusUpdate>,
78}
79
80impl HealthStatus {
81 fn kafka(update: HealthStatusUpdate) -> Self {
82 Self {
83 kafka: Some(update),
84 ssh: None,
85 }
86 }
87
88 fn ssh(update: HealthStatusUpdate) -> Self {
89 Self {
90 kafka: None,
91 ssh: Some(update),
92 }
93 }
94}
95
96pub struct KafkaSourceReader {
98 topic_name: String,
100 source_name: String,
102 id: GlobalId,
104 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
106 partition_consumers: Vec<PartitionConsumer>,
108 worker_id: usize,
110 worker_count: usize,
112 last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
116 start_offsets: BTreeMap<PartitionId, i64>,
118 stats_rx: crossbeam_channel::Receiver<Jsonb>,
120 partition_metrics: KafkaSourceMetrics,
122 partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
124}
125
126struct PartitionCapability {
127 data: Capability<KafkaTimestamp>,
129 progress: Capability<KafkaTimestamp>,
131}
132
133type HighWatermark = u64;
137
138pub struct KafkaResumeUpperProcessor {
141 config: RawSourceCreationConfig,
142 topic_name: String,
143 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
144 statistics: Vec<SourceStatistics>,
145}
146
147fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
151 let pid = usize::try_from(pid).expect("positive pid");
152 ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
153}
154
155struct SourceOutputInfo {
156 id: GlobalId,
157 output_index: usize,
158 resume_upper: Antichain<KafkaTimestamp>,
159 metadata_columns: Vec<KafkaMetadataKind>,
160}
161
162impl SourceRender for KafkaSourceConnection {
163 type Time = KafkaTimestamp;
168
169 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
170
171 fn render<G: Scope<Timestamp = KafkaTimestamp>>(
172 self,
173 scope: &mut G,
174 config: &RawSourceCreationConfig,
175 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
176 start_signal: impl std::future::Future<Output = ()> + 'static,
177 ) -> (
178 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
179 Stream<G, Infallible>,
180 Stream<G, HealthStatusMessage>,
181 Option<Stream<G, Probe<KafkaTimestamp>>>,
182 Vec<PressOnDropButton>,
183 ) {
184 let (metadata, probes, metadata_token) =
185 render_metadata_fetcher(scope, self.clone(), config.clone());
186 let (data, progress, health, reader_token) = render_reader(
187 scope,
188 self,
189 config.clone(),
190 resume_uppers,
191 metadata,
192 start_signal,
193 );
194
195 let partition_count = u64::cast_from(config.source_exports.len());
196 let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
197 partition_count,
198 |((output, data), time, diff): &(
199 (usize, Result<SourceMessage, DataflowError>),
200 _,
201 Diff,
202 )| {
203 let output = u64::cast_from(*output);
204 (output, (data.clone(), time.clone(), diff.clone()))
205 },
206 );
207 let mut data_collections = BTreeMap::new();
208 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
209 data_collections.insert(*id, data_stream.as_collection());
210 }
211
212 (
213 data_collections,
214 progress,
215 health,
216 Some(probes),
217 vec![metadata_token, reader_token],
218 )
219 }
220}
221
222fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
227 scope: &G,
228 connection: KafkaSourceConnection,
229 config: RawSourceCreationConfig,
230 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
231 metadata_stream: Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
232 start_signal: impl std::future::Future<Output = ()> + 'static,
233) -> (
234 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
235 Stream<G, Infallible>,
236 Stream<G, HealthStatusMessage>,
237 PressOnDropButton,
238) {
239 let name = format!("KafkaReader({})", config.id);
240 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
241
242 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
243 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
244 let (health_output, health_stream) = builder.new_output();
245
246 let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline);
247
248 let mut outputs = vec![];
249
250 let mut all_export_stats = vec![];
252 let mut snapshot_export_stats = vec![];
253 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
254 let SourceExport {
255 details,
256 storage_metadata: _,
257 data_config: _,
258 } = export;
259 let resume_upper = Antichain::from_iter(
260 config
261 .source_resume_uppers
262 .get(id)
263 .expect("all source exports must be present in source resume uppers")
264 .iter()
265 .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
266 );
267
268 let metadata_columns = match details {
269 SourceExportDetails::Kafka(details) => details
270 .metadata_columns
271 .iter()
272 .map(|(_name, kind)| kind.clone())
273 .collect::<Vec<_>>(),
274 _ => panic!("unexpected source export details: {:?}", details),
275 };
276
277 let statistics = config
278 .statistics
279 .get(id)
280 .expect("statistics have been initialized")
281 .clone();
282 if resume_upper.as_ref() == &[Partitioned::minimum()] {
284 snapshot_export_stats.push(statistics.clone());
285 }
286 all_export_stats.push(statistics);
287
288 let output = SourceOutputInfo {
289 id: *id,
290 resume_upper,
291 output_index: idx,
292 metadata_columns,
293 };
294 outputs.push(output);
295 }
296
297 let busy_signal = Arc::clone(&config.busy_signal);
298 let button = builder.build(move |caps| {
299 SignaledFuture::new(busy_signal, async move {
300 let [mut data_cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
301
302 let client_id = connection.client_id(
303 config.config.config_set(),
304 &config.config.connection_context,
305 config.id,
306 );
307 let group_id = connection.group_id(&config.config.connection_context, config.id);
308 let KafkaSourceConnection {
309 connection,
310 topic,
311 topic_metadata_refresh_interval,
312 start_offsets,
313 metadata_columns: _,
314 connection_id: _, group_id_prefix: _, } = connection;
319
320 let mut start_offsets: BTreeMap<_, i64> = start_offsets
322 .clone()
323 .into_iter()
324 .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
325 .map(|(k, v)| (k, v))
326 .collect();
327
328 let mut partition_capabilities = BTreeMap::new();
329 let mut max_pid = None;
330 let resume_upper = Antichain::from_iter(
331 outputs
332 .iter()
333 .map(|output| output.resume_upper.clone())
334 .flatten(),
335 );
336
337 for ts in resume_upper.elements() {
338 if let Some(pid) = ts.interval().singleton() {
339 let pid = pid.unwrap_exact();
340 max_pid = std::cmp::max(max_pid, Some(*pid));
341 if responsible_for_pid(&config, *pid) {
342 let restored_offset = i64::try_from(ts.timestamp().offset)
343 .expect("restored kafka offsets must fit into i64");
344 if let Some(start_offset) = start_offsets.get_mut(pid) {
345 *start_offset = std::cmp::max(restored_offset, *start_offset);
346 } else {
347 start_offsets.insert(*pid, restored_offset);
348 }
349
350 let part_ts = Partitioned::new_singleton(
351 RangeBound::exact(*pid),
352 ts.timestamp().clone(),
353 );
354 let part_cap = PartitionCapability {
355 data: data_cap.delayed(&part_ts),
356 progress: progress_cap.delayed(&part_ts),
357 };
358 partition_capabilities.insert(*pid, part_cap);
359 }
360 }
361 }
362 let lower = max_pid
363 .map(RangeBound::after)
364 .unwrap_or(RangeBound::NegInfinity);
365 let future_ts =
366 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
367 data_cap.downgrade(&future_ts);
368 progress_cap.downgrade(&future_ts);
369
370 info!(
371 source_id = config.id.to_string(),
372 worker_id = config.worker_id,
373 num_workers = config.worker_count,
374 "instantiating Kafka source reader at offsets {start_offsets:?}"
375 );
376
377 let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
378 let notificator = Arc::new(Notify::new());
379
380 let consumer: Result<BaseConsumer<_>, _> = connection
381 .create_with_context(
382 &config.config,
383 GlueConsumerContext {
384 notificator: Arc::clone(¬ificator),
385 stats_tx,
386 inner: MzClientContext::default(),
387 },
388 &btreemap! {
389 "enable.auto.commit" => "false".into(),
394 "auto.offset.reset" => "earliest".into(),
397 "topic.metadata.refresh.interval.ms" =>
400 topic_metadata_refresh_interval
401 .as_millis()
402 .to_string(),
403 "fetch.message.max.bytes" => "134217728".into(),
405 "group.id" => group_id.clone(),
411 "client.id" => client_id.clone(),
414 },
415 InTask::Yes,
416 )
417 .await;
418
419 let consumer = match consumer {
420 Ok(consumer) => Arc::new(consumer),
421 Err(e) => {
422 let update = HealthStatusUpdate::halting(
423 format!(
424 "failed creating kafka reader consumer: {}",
425 e.display_with_causes()
426 ),
427 None,
428 );
429 for (output, update) in outputs.iter().repeat_clone(update) {
430 health_output.give(
431 &health_cap,
432 HealthStatusMessage {
433 id: Some(output.id),
434 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
435 StatusNamespace::Ssh
436 } else {
437 StatusNamespace::Kafka
438 },
439 update,
440 },
441 );
442 }
443 std::future::pending::<()>().await;
447 unreachable!("pending future never returns");
448 }
449 };
450
451 start_signal.await;
455 info!(
456 source_id = config.id.to_string(),
457 worker_id = config.worker_id,
458 num_workers = config.worker_count,
459 "kafka worker noticed rehydration is finished, starting partition queues..."
460 );
461
462 let partition_ids = start_offsets.keys().copied().collect();
463 let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
464
465 let mut reader = KafkaSourceReader {
466 topic_name: topic.clone(),
467 source_name: config.name.clone(),
468 id: config.id,
469 partition_consumers: Vec::new(),
470 consumer: Arc::clone(&consumer),
471 worker_id: config.worker_id,
472 worker_count: config.worker_count,
473 last_offsets: outputs
474 .iter()
475 .map(|output| (output.output_index, BTreeMap::new()))
476 .collect(),
477 start_offsets,
478 stats_rx,
479 partition_metrics: config.metrics.get_kafka_source_metrics(
480 partition_ids,
481 topic.clone(),
482 config.id,
483 ),
484 partition_capabilities,
485 };
486
487 let offset_committer = KafkaResumeUpperProcessor {
488 config: config.clone(),
489 topic_name: topic.clone(),
490 consumer,
491 statistics: all_export_stats.clone(),
492 };
493
494 if !snapshot_export_stats.is_empty() {
496 if let Err(e) = offset_committer
497 .process_frontier(resume_upper.clone())
498 .await
499 {
500 offset_commit_metrics.offset_commit_failures.inc();
501 tracing::warn!(
502 %e,
503 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
504 worker_id = config.worker_id,
505 source_id = config.id,
506 upper = resume_upper.pretty()
507 );
508 }
509 for statistics in config.statistics.values() {
513 statistics.set_snapshot_records_known(0);
514 statistics.set_snapshot_records_staged(0);
515 }
516 }
517
518 let resume_uppers_process_loop = async move {
519 tokio::pin!(resume_uppers);
520 while let Some(frontier) = resume_uppers.next().await {
521 if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
522 offset_commit_metrics.offset_commit_failures.inc();
523 tracing::warn!(
524 %e,
525 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
526 worker_id = config.worker_id,
527 source_id = config.id,
528 upper = frontier.pretty()
529 );
530 }
531 }
532 std::future::pending::<()>().await;
537 };
538 tokio::pin!(resume_uppers_process_loop);
539
540 let mut metadata_update: Option<MetadataUpdate> = None;
541 let mut snapshot_total = None;
542
543 let max_wait_time =
544 mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
545 loop {
546 tokio::select! {
549 _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
552
553 _ = metadata_input.ready() => {
554 let mut updates = Vec::new();
556 while let Some(event) = metadata_input.next_sync() {
557 if let Event::Data(_, mut data) = event {
558 updates.append(&mut data);
559 }
560 }
561 metadata_update = updates
562 .into_iter()
563 .max_by_key(|(ts, _)| *ts)
564 .map(|(_, update)| update);
565 }
566
567 _ = resume_uppers_process_loop.as_mut() => {},
571 }
572
573 match metadata_update.take() {
574 Some(MetadataUpdate::Partitions(partitions)) => {
575 let max_pid = partitions.keys().last().cloned();
576 let lower = max_pid
577 .map(RangeBound::after)
578 .unwrap_or(RangeBound::NegInfinity);
579 let future_ts = Partitioned::new_range(
580 lower,
581 RangeBound::PosInfinity,
582 MzOffset::from(0),
583 );
584
585 let mut offset_known = 0;
586 for (&pid, &high_watermark) in &partitions {
587 if responsible_for_pid(&config, pid) {
588 offset_known += high_watermark;
589 reader.ensure_partition(pid);
590 if let Entry::Vacant(entry) =
591 reader.partition_capabilities.entry(pid)
592 {
593 let start_offset = match reader.start_offsets.get(&pid) {
594 Some(&offset) => offset.try_into().unwrap(),
595 None => 0u64,
596 };
597 let part_since_ts = Partitioned::new_singleton(
598 RangeBound::exact(pid),
599 MzOffset::from(start_offset),
600 );
601 let part_upper_ts = Partitioned::new_singleton(
602 RangeBound::exact(pid),
603 MzOffset::from(high_watermark),
604 );
605
606 entry.insert(PartitionCapability {
616 data: data_cap.delayed(&part_since_ts),
617 progress: progress_cap.delayed(&part_upper_ts),
618 });
619 }
620 }
621 }
622
623 if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
626 snapshot_total = Some(offset_known);
630 }
631
632 for output in &outputs {
636 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
637 health_output.give(
638 &health_cap,
639 HealthStatusMessage {
640 id: Some(output.id),
641 namespace,
642 update: HealthStatusUpdate::running(),
643 },
644 );
645 }
646 }
647
648 for export_stat in all_export_stats.iter() {
649 export_stat.set_offset_known(offset_known);
650 }
651
652 data_cap.downgrade(&future_ts);
653 progress_cap.downgrade(&future_ts);
654 }
655 Some(MetadataUpdate::TransientError(status)) => {
656 if let Some(update) = status.kafka {
657 for (output, update) in outputs.iter().repeat_clone(update) {
658 health_output.give(
659 &health_cap,
660 HealthStatusMessage {
661 id: Some(output.id),
662 namespace: StatusNamespace::Kafka,
663 update,
664 },
665 );
666 }
667 }
668 if let Some(update) = status.ssh {
669 for (output, update) in outputs.iter().repeat_clone(update) {
670 health_output.give(
671 &health_cap,
672 HealthStatusMessage {
673 id: Some(output.id),
674 namespace: StatusNamespace::Ssh,
675 update,
676 },
677 );
678 }
679 }
680 }
681 Some(MetadataUpdate::DefiniteError(error)) => {
682 let error = Err(error.into());
683 let time = data_cap.time().clone();
684 for (output, error) in
685 outputs.iter().map(|o| o.output_index).repeat_clone(error)
686 {
687 data_output
688 .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
689 .await;
690 }
691
692 return;
693 }
694 None => {}
695 }
696
697 while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
705 match result {
706 Err(e) => {
707 let error = format!(
708 "kafka error when polling consumer for source: {} topic: {} : {}",
709 reader.source_name, reader.topic_name, e
710 );
711 let status = HealthStatusUpdate::stalled(error, None);
712 for (output, status) in outputs.iter().repeat_clone(status) {
713 health_output.give(
714 &health_cap,
715 HealthStatusMessage {
716 id: Some(output.id),
717 namespace: StatusNamespace::Kafka,
718 update: status,
719 },
720 );
721 }
722 }
723 Ok(message) => {
724 let output_messages = outputs
725 .iter()
726 .map(|output| {
727 let (message, ts) = construct_source_message(
728 &message,
729 &output.metadata_columns,
730 );
731 (output.output_index, message, ts)
732 })
733 .collect::<Vec<_>>();
737 for (output_index, message, ts) in output_messages {
738 if let Some((msg, time, diff)) =
739 reader.handle_message(message, ts, &output_index)
740 {
741 let pid = time.interval().singleton().unwrap().unwrap_exact();
742 let part_cap = &reader.partition_capabilities[pid].data;
743 let msg = msg.map_err(|e| {
744 DataflowError::SourceError(Box::new(SourceError {
745 error: SourceErrorDetails::Other(e.to_string().into()),
746 }))
747 });
748 data_output
749 .give_fueled(part_cap, ((output_index, msg), time, diff))
750 .await;
751 }
752 }
753 }
754 }
755 }
756
757 reader.update_stats();
758
759 let mut consumers = std::mem::take(&mut reader.partition_consumers);
761 for consumer in consumers.iter_mut() {
762 let pid = consumer.pid();
763 let mut partition_exhausted = false;
769 for _ in 0..10_000 {
770 let Some(message) = consumer.get_next_message().transpose() else {
771 partition_exhausted = true;
772 break;
773 };
774
775 for output in outputs.iter() {
776 let message = match &message {
777 Ok((msg, pid)) => {
778 let (msg, ts) =
779 construct_source_message(msg, &output.metadata_columns);
780 assert_eq!(*pid, ts.0);
781 Ok(reader.handle_message(msg, ts, &output.output_index))
782 }
783 Err(err) => Err(err),
784 };
785 match message {
786 Ok(Some((msg, time, diff))) => {
787 let pid = time.interval().singleton().unwrap().unwrap_exact();
788 let part_cap = &reader.partition_capabilities[pid].data;
789 let msg = msg.map_err(|e| {
790 DataflowError::SourceError(Box::new(SourceError {
791 error: SourceErrorDetails::Other(e.to_string().into()),
792 }))
793 });
794 data_output
795 .give_fueled(
796 part_cap,
797 ((output.output_index, msg), time, diff),
798 )
799 .await;
800 }
801 Ok(None) => continue,
803 Err(err) => {
804 let last_offset = reader
805 .last_offsets
806 .get(&output.output_index)
807 .expect("output known to be installed")
808 .get(&pid)
809 .expect("partition known to be installed");
810
811 let status = HealthStatusUpdate::stalled(
812 format!(
813 "error consuming from source: {} topic: {topic}:\
814 partition: {pid} last processed offset:\
815 {last_offset} : {err}",
816 config.name
817 ),
818 None,
819 );
820 health_output.give(
821 &health_cap,
822 HealthStatusMessage {
823 id: Some(output.id),
824 namespace: StatusNamespace::Kafka,
825 update: status,
826 },
827 );
828 }
829 }
830 }
831 }
832 if !partition_exhausted {
833 notificator.notify_one();
834 }
835 }
836 assert!(reader.partition_consumers.is_empty());
838 reader.partition_consumers = consumers;
839
840 let positions = reader.consumer.position().unwrap();
841 let topic_positions = positions.elements_for_topic(&reader.topic_name);
842 let mut snapshot_staged = 0;
843
844 for position in topic_positions {
845 if let Offset::Offset(offset) = position.offset() {
848 let pid = position.partition();
849 let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
850 let upper =
851 Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
852
853 let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
854 match part_cap.data.try_downgrade(&upper) {
855 Ok(()) => {
856 if !snapshot_export_stats.is_empty() {
857 snapshot_staged += offset.try_into().unwrap_or(0u64);
860 if let Some(snapshot_total) = snapshot_total {
862 snapshot_staged =
865 std::cmp::min(snapshot_staged, snapshot_total);
866 }
867 }
868 }
869 Err(_) => {
870 info!(
873 source_id = config.id.to_string(),
874 worker_id = config.worker_id,
875 num_workers = config.worker_count,
876 "kafka source frontier downgrade skipped due to already \
877 seen offset: {:?}",
878 upper
879 );
880 }
881 };
882
883 let _ = part_cap.progress.try_downgrade(&upper);
888 }
889 }
890
891 if let (Some(snapshot_total), true) =
892 (snapshot_total, !snapshot_export_stats.is_empty())
893 {
894 for export_stat in snapshot_export_stats.iter() {
895 export_stat.set_snapshot_records_known(snapshot_total);
896 export_stat.set_snapshot_records_staged(snapshot_staged);
897 }
898 if snapshot_total == snapshot_staged {
899 snapshot_export_stats.clear();
900 }
901 }
902 }
903 })
904 });
905
906 (
907 stream.as_collection(),
908 progress_stream,
909 health_stream,
910 button.press_on_drop(),
911 )
912}
913
914impl KafkaResumeUpperProcessor {
915 async fn process_frontier(
916 &self,
917 frontier: Antichain<KafkaTimestamp>,
918 ) -> Result<(), anyhow::Error> {
919 use rdkafka::consumer::CommitMode;
920
921 let mut offsets = vec![];
923 let mut offset_committed = 0;
924 for ts in frontier.iter() {
925 if let Some(pid) = ts.interval().singleton() {
926 let pid = pid.unwrap_exact();
927 if responsible_for_pid(&self.config, *pid) {
928 offsets.push((pid.clone(), *ts.timestamp()));
929
930 offset_committed += ts.timestamp().offset;
935 }
936 }
937 }
938
939 for export_stat in self.statistics.iter() {
940 export_stat.set_offset_committed(offset_committed);
941 }
942
943 if !offsets.is_empty() {
944 let mut tpl = TopicPartitionList::new();
945 for (pid, offset) in offsets {
946 let offset_to_commit =
947 Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
948 tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
949 .expect("offset known to be valid");
950 }
951 let consumer = Arc::clone(&self.consumer);
952 mz_ore::task::spawn_blocking(
953 || format!("source({}) kafka offset commit", self.config.id),
954 move || consumer.commit(&tpl, CommitMode::Sync),
955 )
956 .await??;
957 }
958 Ok(())
959 }
960}
961
962impl KafkaSourceReader {
963 fn ensure_partition(&mut self, pid: PartitionId) {
965 if self.last_offsets.is_empty() {
966 tracing::info!(
967 source_id = %self.id,
968 worker_id = %self.worker_id,
969 "kafka source does not have any outputs, not creating partition queue");
970
971 return;
972 }
973 for last_offsets in self.last_offsets.values() {
974 if last_offsets.contains_key(&pid) {
976 return;
977 }
978 }
979
980 let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
981 self.create_partition_queue(pid, Offset::Offset(start_offset));
982
983 for last_offsets in self.last_offsets.values_mut() {
984 let prev = last_offsets.insert(pid, start_offset - 1);
985 assert_none!(prev);
986 }
987 }
988
989 fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
991 info!(
992 source_id = self.id.to_string(),
993 worker_id = self.worker_id,
994 num_workers = self.worker_count,
995 "activating Kafka queue for topic {}, partition {}",
996 self.topic_name,
997 partition_id,
998 );
999
1000 let tpl = self.consumer.assignment().unwrap();
1002 let mut partition_list = TopicPartitionList::new();
1004 for partition in tpl.elements_for_topic(&self.topic_name) {
1005 partition_list
1006 .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1007 .expect("offset known to be valid");
1008 }
1009 partition_list
1011 .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1012 .expect("offset known to be valid");
1013 self.consumer
1014 .assign(&partition_list)
1015 .expect("assignment known to be valid");
1016
1017 let context = Arc::clone(self.consumer.context());
1020 for pc in &mut self.partition_consumers {
1021 pc.partition_queue = self
1022 .consumer
1023 .split_partition_queue(&self.topic_name, pc.pid)
1024 .expect("partition known to be valid");
1025 pc.partition_queue.set_nonempty_callback({
1026 let context = Arc::clone(&context);
1027 move || context.inner().activate()
1028 });
1029 }
1030
1031 let mut partition_queue = self
1032 .consumer
1033 .split_partition_queue(&self.topic_name, partition_id)
1034 .expect("partition known to be valid");
1035 partition_queue.set_nonempty_callback(move || context.inner().activate());
1036 self.partition_consumers
1037 .push(PartitionConsumer::new(partition_id, partition_queue));
1038 assert_eq!(
1039 self.consumer
1040 .assignment()
1041 .unwrap()
1042 .elements_for_topic(&self.topic_name)
1043 .len(),
1044 self.partition_consumers.len()
1045 );
1046 }
1047
1048 fn update_stats(&mut self) {
1050 while let Ok(stats) = self.stats_rx.try_recv() {
1051 match serde_json::from_str::<Statistics>(&stats.to_string()) {
1052 Ok(statistics) => {
1053 let topic = statistics.topics.get(&self.topic_name);
1054 match topic {
1055 Some(topic) => {
1056 for (id, partition) in &topic.partitions {
1057 self.partition_metrics
1058 .set_offset_max(*id, partition.hi_offset);
1059 }
1060 }
1061 None => error!("No stats found for topic: {}", &self.topic_name),
1062 }
1063 }
1064 Err(e) => {
1065 error!("failed decoding librdkafka statistics JSON: {}", e);
1066 }
1067 }
1068 }
1069 }
1070
1071 fn handle_message(
1074 &mut self,
1075 message: Result<SourceMessage, KafkaHeaderParseError>,
1076 (partition, offset): (PartitionId, MzOffset),
1077 output_index: &usize,
1078 ) -> Option<(
1079 Result<SourceMessage, KafkaHeaderParseError>,
1080 KafkaTimestamp,
1081 Diff,
1082 )> {
1083 assert!(
1095 self.last_offsets
1096 .get(output_index)
1097 .unwrap()
1098 .contains_key(&partition)
1099 );
1100
1101 let last_offset_ref = self
1102 .last_offsets
1103 .get_mut(output_index)
1104 .expect("output known to be installed")
1105 .get_mut(&partition)
1106 .expect("partition known to be installed");
1107
1108 let last_offset = *last_offset_ref;
1109 let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1110 if offset_as_i64 <= last_offset {
1111 info!(
1112 source_id = self.id.to_string(),
1113 worker_id = self.worker_id,
1114 num_workers = self.worker_count,
1115 "kafka message before expected offset: \
1116 source {} (reading topic {}, partition {}, output {}) \
1117 received offset {} expected offset {:?}",
1118 self.source_name,
1119 self.topic_name,
1120 partition,
1121 output_index,
1122 offset.offset,
1123 last_offset + 1,
1124 );
1125 None
1127 } else {
1128 *last_offset_ref = offset_as_i64;
1129
1130 let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1131 Some((message, ts, Diff::ONE))
1132 }
1133 }
1134}
1135
1136fn construct_source_message(
1137 msg: &BorrowedMessage<'_>,
1138 metadata_columns: &[KafkaMetadataKind],
1139) -> (
1140 Result<SourceMessage, KafkaHeaderParseError>,
1141 (PartitionId, MzOffset),
1142) {
1143 let pid = msg.partition();
1144 let Ok(offset) = u64::try_from(msg.offset()) else {
1145 panic!(
1146 "got negative offset ({}) from otherwise non-error'd kafka message",
1147 msg.offset()
1148 );
1149 };
1150
1151 let mut metadata = Row::default();
1152 let mut packer = metadata.packer();
1153 for kind in metadata_columns {
1154 match kind {
1155 KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1156 KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1157 KafkaMetadataKind::Timestamp => {
1158 let ts = msg
1159 .timestamp()
1160 .to_millis()
1161 .expect("kafka sources always have upstream_time");
1162
1163 let d: Datum = DateTime::from_timestamp_millis(ts)
1164 .and_then(|dt| {
1165 let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1166 dt.naive_utc().try_into().ok();
1167 ct
1168 })
1169 .into();
1170 packer.push(d)
1171 }
1172 KafkaMetadataKind::Header { key, use_bytes } => {
1173 match msg.headers() {
1174 Some(headers) => {
1175 let d = headers
1176 .iter()
1177 .filter(|header| header.key == key)
1178 .last()
1179 .map(|header| match header.value {
1180 Some(v) => {
1181 if *use_bytes {
1182 Ok(Datum::Bytes(v))
1183 } else {
1184 match str::from_utf8(v) {
1185 Ok(str) => Ok(Datum::String(str)),
1186 Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1187 key: key.clone(),
1188 raw: v.to_vec(),
1189 }),
1190 }
1191 }
1192 }
1193 None => Ok(Datum::Null),
1194 })
1195 .unwrap_or_else(|| {
1196 Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1197 });
1198 match d {
1199 Ok(d) => packer.push(d),
1200 Err(err) => return (Err(err), (pid, offset.into())),
1202 }
1203 }
1204 None => packer.push(Datum::Null),
1205 }
1206 }
1207 KafkaMetadataKind::Headers => {
1208 packer.push_list_with(|r| {
1209 if let Some(headers) = msg.headers() {
1210 for header in headers.iter() {
1211 match header.value {
1212 Some(v) => r.push_list_with(|record_row| {
1213 record_row.push(Datum::String(header.key));
1214 record_row.push(Datum::Bytes(v));
1215 }),
1216 None => r.push_list_with(|record_row| {
1217 record_row.push(Datum::String(header.key));
1218 record_row.push(Datum::Null);
1219 }),
1220 }
1221 }
1222 }
1223 });
1224 }
1225 }
1226 }
1227
1228 let key = match msg.key() {
1229 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1230 None => Row::pack([Datum::Null]),
1231 };
1232 let value = match msg.payload() {
1233 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1234 None => Row::pack([Datum::Null]),
1235 };
1236 (
1237 Ok(SourceMessage {
1238 key,
1239 value,
1240 metadata,
1241 }),
1242 (pid, offset.into()),
1243 )
1244}
1245
1246struct PartitionConsumer {
1248 pid: PartitionId,
1250 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1252}
1253
1254impl PartitionConsumer {
1255 fn new(
1257 pid: PartitionId,
1258 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1259 ) -> Self {
1260 PartitionConsumer {
1261 pid,
1262 partition_queue,
1263 }
1264 }
1265
1266 fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1273 match self.partition_queue.poll(Duration::from_millis(0)) {
1274 Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1275 Some(Err(err)) => Err(err),
1276 _ => Ok(None),
1277 }
1278 }
1279
1280 fn pid(&self) -> PartitionId {
1282 self.pid
1283 }
1284}
1285
1286struct GlueConsumerContext {
1289 notificator: Arc<Notify>,
1290 stats_tx: crossbeam_channel::Sender<Jsonb>,
1291 inner: MzClientContext,
1292}
1293
1294impl ClientContext for GlueConsumerContext {
1295 fn stats_raw(&self, statistics: &[u8]) {
1296 match Jsonb::from_slice(statistics) {
1297 Ok(statistics) => {
1298 self.stats_tx
1299 .send(statistics)
1300 .expect("timely operator hung up while Kafka source active");
1301 self.activate();
1302 }
1303 Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1304 };
1305 }
1306
1307 fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1310 self.inner.log(level, fac, log_message)
1311 }
1312 fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1313 self.inner.error(error, reason)
1314 }
1315}
1316
1317impl GlueConsumerContext {
1318 fn activate(&self) {
1319 self.notificator.notify_one();
1320 }
1321}
1322
1323impl ConsumerContext for GlueConsumerContext {}
1324
1325#[cfg(test)]
1326mod tests {
1327 use std::sync::Arc;
1328 use std::time::Duration;
1329
1330 use mz_kafka_util::client::create_new_client_config_simple;
1331 use rdkafka::consumer::{BaseConsumer, Consumer};
1332 use rdkafka::{Message, Offset, TopicPartitionList};
1333 use uuid::Uuid;
1334
1335 #[mz_ore::test]
1347 #[ignore]
1348 fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1349 let topic_name = "queue-test";
1350 let pid = 0;
1351
1352 let mut kafka_config = create_new_client_config_simple();
1353 kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1354 kafka_config.set("enable.auto.commit", "false");
1355 kafka_config.set("group.id", Uuid::new_v4().to_string());
1356 kafka_config.set("fetch.message.max.bytes", "100");
1357 let consumer: BaseConsumer<_> = kafka_config.create()?;
1358
1359 let consumer = Arc::new(consumer);
1360
1361 let mut partition_list = TopicPartitionList::new();
1362 partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1365
1366 consumer.assign(&partition_list)?;
1367
1368 let partition_queue = consumer
1369 .split_partition_queue(topic_name, pid)
1370 .expect("missing partition queue");
1371
1372 let expected_messages = 1_000;
1373
1374 let mut common_queue_count = 0;
1375 let mut partition_queue_count = 0;
1376
1377 loop {
1378 if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1379 match msg {
1380 Ok(msg) => {
1381 let _payload =
1382 std::str::from_utf8(msg.payload().expect("missing payload"))?;
1383 if partition_queue_count > 0 {
1384 anyhow::bail!(
1385 "Got message from common queue after we internally switched to partition queue."
1386 );
1387 }
1388
1389 common_queue_count += 1;
1390 }
1391 Err(err) => anyhow::bail!("{}", err),
1392 }
1393 }
1394
1395 match partition_queue.poll(Duration::from_millis(0)) {
1396 Some(Ok(msg)) => {
1397 let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1398 partition_queue_count += 1;
1399 }
1400 Some(Err(err)) => anyhow::bail!("{}", err),
1401 _ => (),
1402 }
1403
1404 if (common_queue_count + partition_queue_count) == expected_messages {
1405 break;
1406 }
1407 }
1408
1409 assert!(
1410 common_queue_count == 0,
1411 "Got {} out of {} messages from common queue. Partition queue: {}",
1412 common_queue_count,
1413 expected_messages,
1414 partition_queue_count
1415 );
1416
1417 Ok(())
1418 }
1419}
1420
1421fn fetch_partition_info<C: ConsumerContext>(
1423 consumer: &BaseConsumer<C>,
1424 topic: &str,
1425 fetch_timeout: Duration,
1426) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1427 let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1428
1429 let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1430 for pid in pids {
1431 offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1432 }
1433
1434 let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1435
1436 let mut result = BTreeMap::new();
1437 for entry in offset_responses.elements() {
1438 let offset = match entry.offset() {
1439 Offset::Offset(offset) => offset,
1440 offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1441 };
1442
1443 let pid = entry.partition();
1444 let watermark = offset.try_into().expect("invalid negative offset");
1445 result.insert(pid, watermark);
1446 }
1447
1448 Ok(result)
1449}
1450
1451#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1453enum MetadataUpdate {
1454 Partitions(BTreeMap<PartitionId, HighWatermark>),
1456 TransientError(HealthStatus),
1460 DefiniteError(SourceError),
1464}
1465
1466impl MetadataUpdate {
1467 fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1469 match self {
1470 Self::Partitions(partitions) => {
1471 let max_pid = partitions.keys().last().copied();
1472 let lower = max_pid
1473 .map(RangeBound::after)
1474 .unwrap_or(RangeBound::NegInfinity);
1475 let future_ts =
1476 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1477
1478 let mut frontier = Antichain::from_elem(future_ts);
1479 for (pid, high_watermark) in partitions {
1480 frontier.insert(Partitioned::new_singleton(
1481 RangeBound::exact(*pid),
1482 MzOffset::from(*high_watermark),
1483 ));
1484 }
1485
1486 Some(frontier)
1487 }
1488 Self::DefiniteError(_) => Some(Antichain::new()),
1489 Self::TransientError(_) => None,
1490 }
1491 }
1492}
1493
1494#[derive(Debug, thiserror::Error)]
1495pub enum KafkaHeaderParseError {
1496 #[error("A header with key '{key}' was not found in the message headers")]
1497 KeyNotFound { key: String },
1498 #[error(
1499 "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1500 )]
1501 Utf8Error { key: String, raw: Vec<u8> },
1502}
1503
1504fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
1510 scope: &G,
1511 connection: KafkaSourceConnection,
1512 config: RawSourceCreationConfig,
1513) -> (
1514 Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
1515 Stream<G, Probe<KafkaTimestamp>>,
1516 PressOnDropButton,
1517) {
1518 let active_worker_id = usize::cast_from(config.id.hashed());
1519 let is_active_worker = active_worker_id % scope.peers() == scope.index();
1520
1521 let resume_upper = Antichain::from_iter(
1522 config
1523 .source_resume_uppers
1524 .values()
1525 .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1526 .flatten(),
1527 );
1528
1529 let name = format!("KafkaMetadataFetcher({})", config.id);
1530 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1531
1532 let (metadata_output, metadata_stream) = builder.new_output();
1533 let (probe_output, probe_stream) = builder.new_output();
1534
1535 let button = builder.build(move |caps| async move {
1536 if !is_active_worker {
1537 return;
1538 }
1539
1540 let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1541
1542 let client_id = connection.client_id(
1543 config.config.config_set(),
1544 &config.config.connection_context,
1545 config.id,
1546 );
1547 let KafkaSourceConnection {
1548 connection,
1549 topic,
1550 topic_metadata_refresh_interval,
1551 ..
1552 } = connection;
1553
1554 let consumer: Result<BaseConsumer<_>, _> = connection
1555 .create_with_context(
1556 &config.config,
1557 MzClientContext::default(),
1558 &btreemap! {
1559 "topic.metadata.refresh.interval.ms" =>
1562 topic_metadata_refresh_interval
1563 .as_millis()
1564 .to_string(),
1565 "client.id" => format!("{client_id}-metadata"),
1568 },
1569 InTask::Yes,
1570 )
1571 .await;
1572
1573 let consumer = match consumer {
1574 Ok(consumer) => consumer,
1575 Err(e) => {
1576 let msg = format!(
1577 "failed creating kafka metadata consumer: {}",
1578 e.display_with_causes()
1579 );
1580 let status_update = HealthStatusUpdate::halting(msg, None);
1581 let status = match e {
1582 ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1583 _ => HealthStatus::kafka(status_update),
1584 };
1585 let error = MetadataUpdate::TransientError(status);
1586 let timestamp = (config.now_fn)().into();
1587 metadata_output.give(&metadata_cap, (timestamp, error));
1588
1589 std::future::pending::<()>().await;
1593 unreachable!("pending future never returns");
1594 }
1595 };
1596
1597 let (tx, mut rx) = mpsc::unbounded_channel();
1598 spawn_metadata_thread(config, consumer, topic, tx);
1599
1600 let mut prev_upstream_frontier = resume_upper;
1601
1602 while let Some((timestamp, mut update)) = rx.recv().await {
1603 if prev_upstream_frontier.is_empty() {
1604 return;
1605 }
1606
1607 if let Some(upstream_frontier) = update.upstream_frontier() {
1608 if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1618 let error = SourceError {
1619 error: SourceErrorDetails::Other("topic was recreated".into()),
1620 };
1621 update = MetadataUpdate::DefiniteError(error);
1622 }
1623 }
1624
1625 if let Some(upstream_frontier) = update.upstream_frontier() {
1626 prev_upstream_frontier = upstream_frontier.clone();
1627
1628 let probe = Probe {
1629 probe_ts: timestamp,
1630 upstream_frontier,
1631 };
1632 probe_output.give(&probe_cap, probe);
1633 }
1634
1635 metadata_output.give(&metadata_cap, (timestamp, update));
1636 }
1637 });
1638
1639 (metadata_stream, probe_stream, button.press_on_drop())
1640}
1641
1642fn spawn_metadata_thread<C: ConsumerContext>(
1643 config: RawSourceCreationConfig,
1644 consumer: BaseConsumer<TunnelingClientContext<C>>,
1645 topic: String,
1646 tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1647) {
1648 thread::Builder::new()
1650 .name(format!("kfk-mtdt-{}", config.id))
1651 .spawn(move || {
1652 trace!(
1653 source_id = config.id.to_string(),
1654 worker_id = config.worker_id,
1655 num_workers = config.worker_count,
1656 "kafka metadata thread: starting..."
1657 );
1658
1659 let mut ticker = probe::Ticker::new(
1660 || KAFKA_METADATA_FETCH_INTERVAL.get(config.config.config_set()),
1661 config.now_fn,
1662 );
1663
1664 loop {
1665 let probe_ts = ticker.tick_blocking();
1666 let result = fetch_partition_info(
1667 &consumer,
1668 &topic,
1669 config
1670 .config
1671 .parameters
1672 .kafka_timeout_config
1673 .fetch_metadata_timeout,
1674 );
1675 trace!(
1676 source_id = config.id.to_string(),
1677 worker_id = config.worker_id,
1678 num_workers = config.worker_count,
1679 "kafka metadata thread: metadata fetch result: {:?}",
1680 result
1681 );
1682 let update = match result {
1683 Ok(partitions) => {
1684 trace!(
1685 source_id = config.id.to_string(),
1686 worker_id = config.worker_id,
1687 num_workers = config.worker_count,
1688 "kafka metadata thread: fetched partition metadata info",
1689 );
1690
1691 MetadataUpdate::Partitions(partitions)
1692 }
1693 Err(GetPartitionsError::TopicDoesNotExist) => {
1694 let error = SourceError {
1695 error: SourceErrorDetails::Other("topic was deleted".into()),
1696 };
1697 MetadataUpdate::DefiniteError(error)
1698 }
1699 Err(e) => {
1700 let kafka_status = Some(HealthStatusUpdate::stalled(
1701 format!("{}", e.display_with_causes()),
1702 None,
1703 ));
1704
1705 let ssh_status = consumer.client().context().tunnel_status();
1706 let ssh_status = match ssh_status {
1707 SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1708 SshTunnelStatus::Errored(e) => {
1709 Some(HealthStatusUpdate::stalled(e, None))
1710 }
1711 };
1712
1713 MetadataUpdate::TransientError(HealthStatus {
1714 kafka: kafka_status,
1715 ssh: ssh_status,
1716 })
1717 }
1718 };
1719
1720 if tx.send((probe_ts, update)).is_err() {
1721 break;
1722 }
1723 }
1724
1725 info!(
1726 source_id = config.id.to_string(),
1727 worker_id = config.worker_id,
1728 num_workers = config.worker_count,
1729 "kafka metadata thread: receiver has gone away; shutting down."
1730 )
1731 })
1732 .unwrap();
1733}