1use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::str::{self};
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use chrono::{DateTime, NaiveDateTime};
19use differential_dataflow::{AsCollection, Hashable};
20use futures::StreamExt;
21use itertools::Itertools;
22use maplit::btreemap;
23use mz_kafka_util::client::{
24 GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
25};
26use mz_ore::assert_none;
27use mz_ore::cast::CastFrom;
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::iter::IteratorExt;
31use mz_repr::adt::timestamp::CheckedTimestamp;
32use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
33use mz_ssh_util::tunnel::SshTunnelStatus;
34use mz_storage_types::errors::{
35 ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
36};
37use mz_storage_types::sources::kafka::{
38 KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
39};
40use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
41use mz_timely_util::antichain::AntichainExt;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::containers::stack::AccountedStackBuilder;
46use mz_timely_util::order::Partitioned;
47use rdkafka::consumer::base_consumer::PartitionQueue;
48use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
49use rdkafka::error::KafkaError;
50use rdkafka::message::{BorrowedMessage, Headers};
51use rdkafka::statistics::Statistics;
52use rdkafka::topic_partition_list::Offset;
53use rdkafka::{ClientContext, Message, TopicPartitionList};
54use serde::{Deserialize, Serialize};
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Pipeline;
58use timely::dataflow::operators::Capability;
59use timely::dataflow::operators::core::Partition;
60use timely::dataflow::operators::vec::Broadcast;
61use timely::dataflow::{Scope, StreamVec};
62use timely::progress::Antichain;
63use timely::progress::Timestamp;
64use tokio::sync::{Notify, mpsc};
65use tracing::{error, info, trace};
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
68use crate::metrics::source::kafka::KafkaSourceMetrics;
69use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
70use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
71use crate::statistics::SourceStatistics;
72
73#[derive(
74 Clone,
75 Debug,
76 Default,
77 PartialEq,
78 Eq,
79 PartialOrd,
80 Ord,
81 Serialize,
82 Deserialize
83)]
84struct HealthStatus {
85 kafka: Option<HealthStatusUpdate>,
86 ssh: Option<HealthStatusUpdate>,
87}
88
89impl HealthStatus {
90 fn kafka(update: HealthStatusUpdate) -> Self {
91 Self {
92 kafka: Some(update),
93 ssh: None,
94 }
95 }
96
97 fn ssh(update: HealthStatusUpdate) -> Self {
98 Self {
99 kafka: None,
100 ssh: Some(update),
101 }
102 }
103}
104
105pub struct KafkaSourceReader {
107 topic_name: String,
109 source_name: String,
111 id: GlobalId,
113 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
115 partition_consumers: Vec<PartitionConsumer>,
117 worker_id: usize,
119 worker_count: usize,
121 last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
125 start_offsets: BTreeMap<PartitionId, i64>,
127 stats_rx: crossbeam_channel::Receiver<Jsonb>,
129 partition_metrics: KafkaSourceMetrics,
131 partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
133}
134
135struct PartitionCapability {
136 data: Capability<KafkaTimestamp>,
138}
139
140type HighWatermark = u64;
144
145pub struct KafkaResumeUpperProcessor {
148 config: RawSourceCreationConfig,
149 topic_name: String,
150 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
151 statistics: Vec<SourceStatistics>,
152}
153
154fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
158 let pid = usize::try_from(pid).expect("positive pid");
159 ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
160}
161
162struct SourceOutputInfo {
163 id: GlobalId,
164 output_index: usize,
165 resume_upper: Antichain<KafkaTimestamp>,
166 metadata_columns: Vec<KafkaMetadataKind>,
167}
168
169impl SourceRender for KafkaSourceConnection {
170 type Time = KafkaTimestamp;
175
176 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
177
178 fn render<'scope>(
179 self,
180 scope: Scope<'scope, KafkaTimestamp>,
181 config: &RawSourceCreationConfig,
182 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
183 start_signal: impl std::future::Future<Output = ()> + 'static,
184 ) -> (
185 BTreeMap<
186 GlobalId,
187 StackedCollection<'scope, KafkaTimestamp, Result<SourceMessage, DataflowError>>,
188 >,
189 StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
190 StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
191 Vec<PressOnDropButton>,
192 ) {
193 let (metadata, probes, metadata_token) =
194 render_metadata_fetcher(scope, self.clone(), config.clone());
195 let (data, health, reader_token) = render_reader(
196 scope,
197 self,
198 config.clone(),
199 resume_uppers,
200 metadata,
201 start_signal,
202 );
203
204 let partition_count = u64::cast_from(config.source_exports.len());
205 let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
206 partition_count,
207 |((output, data), time, diff)| {
208 let output = u64::cast_from(*output);
209 (output, (data.clone(), time.clone(), diff.clone()))
210 },
211 );
212 let mut data_collections = BTreeMap::new();
213 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
214 data_collections.insert(*id, data_stream.as_collection());
215 }
216
217 (
218 data_collections,
219 health,
220 probes,
221 vec![metadata_token, reader_token],
222 )
223 }
224}
225
226fn render_reader<'scope>(
231 scope: Scope<'scope, KafkaTimestamp>,
232 connection: KafkaSourceConnection,
233 config: RawSourceCreationConfig,
234 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
235 metadata_stream: StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
236 start_signal: impl std::future::Future<Output = ()> + 'static,
237) -> (
238 StackedCollection<'scope, KafkaTimestamp, (usize, Result<SourceMessage, DataflowError>)>,
239 StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
240 PressOnDropButton,
241) {
242 let name = format!("KafkaReader({})", config.id);
243 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
244
245 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
246 let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
247
248 let mut metadata_input = builder.new_disconnected_input(metadata_stream.broadcast(), Pipeline);
249
250 let mut outputs = vec![];
251
252 let mut all_export_stats = vec![];
254 let mut snapshot_export_stats = vec![];
255 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
256 let SourceExport {
257 details,
258 storage_metadata: _,
259 data_config: _,
260 } = export;
261 let resume_upper = Antichain::from_iter(
262 config
263 .source_resume_uppers
264 .get(id)
265 .expect("all source exports must be present in source resume uppers")
266 .iter()
267 .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
268 );
269
270 let metadata_columns = match details {
271 SourceExportDetails::Kafka(details) => details
272 .metadata_columns
273 .iter()
274 .map(|(_name, kind)| kind.clone())
275 .collect::<Vec<_>>(),
276 _ => panic!("unexpected source export details: {:?}", details),
277 };
278
279 let statistics = config
280 .statistics
281 .get(id)
282 .expect("statistics have been initialized")
283 .clone();
284 if resume_upper.as_ref() == &[Partitioned::minimum()] {
286 snapshot_export_stats.push(statistics.clone());
287 }
288 all_export_stats.push(statistics);
289
290 let output = SourceOutputInfo {
291 id: *id,
292 resume_upper,
293 output_index: idx,
294 metadata_columns,
295 };
296 outputs.push(output);
297 }
298
299 let busy_signal = Arc::clone(&config.busy_signal);
300 let button = builder.build(move |caps| {
301 SignaledFuture::new(busy_signal, async move {
302 let [mut data_cap, health_cap] = caps.try_into().unwrap();
303
304 let client_id = connection.client_id(
305 config.config.config_set(),
306 &config.config.connection_context,
307 config.id,
308 );
309 let group_id = connection.group_id(&config.config.connection_context, config.id);
310 let KafkaSourceConnection {
311 connection,
312 topic,
313 topic_metadata_refresh_interval,
314 start_offsets,
315 metadata_columns: _,
316 connection_id: _, group_id_prefix: _, } = connection;
321
322 let mut start_offsets: BTreeMap<_, i64> = start_offsets
324 .clone()
325 .into_iter()
326 .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
327 .map(|(k, v)| (k, v))
328 .collect();
329
330 let mut partition_capabilities = BTreeMap::new();
331 let mut max_pid = None;
332 let resume_upper = Antichain::from_iter(
333 outputs
334 .iter()
335 .map(|output| output.resume_upper.clone())
336 .flatten(),
337 );
338
339 for ts in resume_upper.elements() {
340 if let Some(pid) = ts.interval().singleton() {
341 let pid = pid.unwrap_exact();
342 max_pid = std::cmp::max(max_pid, Some(*pid));
343 if responsible_for_pid(&config, *pid) {
344 let restored_offset = i64::try_from(ts.timestamp().offset)
345 .expect("restored kafka offsets must fit into i64");
346 if let Some(start_offset) = start_offsets.get_mut(pid) {
347 *start_offset = std::cmp::max(restored_offset, *start_offset);
348 } else {
349 start_offsets.insert(*pid, restored_offset);
350 }
351
352 let part_ts = Partitioned::new_singleton(
353 RangeBound::exact(*pid),
354 ts.timestamp().clone(),
355 );
356 let part_cap = PartitionCapability {
357 data: data_cap.delayed(&part_ts),
358 };
359 partition_capabilities.insert(*pid, part_cap);
360 }
361 }
362 }
363 let lower = max_pid
364 .map(RangeBound::after)
365 .unwrap_or(RangeBound::NegInfinity);
366 let future_ts =
367 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
368 data_cap.downgrade(&future_ts);
369
370 info!(
371 source_id = config.id.to_string(),
372 worker_id = config.worker_id,
373 num_workers = config.worker_count,
374 "instantiating Kafka source reader at offsets {start_offsets:?}"
375 );
376
377 let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
378 let notificator = Arc::new(Notify::new());
379
380 let consumer: Result<BaseConsumer<_>, _> = connection
381 .create_with_context(
382 &config.config,
383 GlueConsumerContext {
384 notificator: Arc::clone(¬ificator),
385 stats_tx,
386 inner: MzClientContext::default(),
387 },
388 &btreemap! {
389 "enable.auto.commit" => "false".into(),
394 "auto.offset.reset" => "earliest".into(),
397 "topic.metadata.refresh.interval.ms" =>
400 topic_metadata_refresh_interval
401 .as_millis()
402 .to_string(),
403 "fetch.message.max.bytes" => "134217728".into(),
405 "group.id" => group_id.clone(),
411 "client.id" => client_id.clone(),
414 },
415 InTask::Yes,
416 )
417 .await;
418
419 let consumer = match consumer {
420 Ok(consumer) => Arc::new(consumer),
421 Err(e) => {
422 let update = HealthStatusUpdate::halting(
423 format!(
424 "failed creating kafka reader consumer: {}",
425 e.display_with_causes()
426 ),
427 None,
428 );
429 health_output.give(
430 &health_cap,
431 HealthStatusMessage {
432 id: None,
433 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
434 StatusNamespace::Ssh
435 } else {
436 StatusNamespace::Kafka
437 },
438 update: update.clone(),
439 },
440 );
441 for (output, update) in outputs.iter().repeat_clone(update) {
442 health_output.give(
443 &health_cap,
444 HealthStatusMessage {
445 id: Some(output.id),
446 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
447 StatusNamespace::Ssh
448 } else {
449 StatusNamespace::Kafka
450 },
451 update,
452 },
453 );
454 }
455 std::future::pending::<()>().await;
459 unreachable!("pending future never returns");
460 }
461 };
462
463 start_signal.await;
467 info!(
468 source_id = config.id.to_string(),
469 worker_id = config.worker_id,
470 num_workers = config.worker_count,
471 "kafka worker noticed rehydration is finished, starting partition queues..."
472 );
473
474 let partition_ids = start_offsets.keys().copied().collect();
475 let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
476
477 let mut reader = KafkaSourceReader {
478 topic_name: topic.clone(),
479 source_name: config.name.clone(),
480 id: config.id,
481 partition_consumers: Vec::new(),
482 consumer: Arc::clone(&consumer),
483 worker_id: config.worker_id,
484 worker_count: config.worker_count,
485 last_offsets: outputs
486 .iter()
487 .map(|output| (output.output_index, BTreeMap::new()))
488 .collect(),
489 start_offsets,
490 stats_rx,
491 partition_metrics: config.metrics.get_kafka_source_metrics(
492 partition_ids,
493 topic.clone(),
494 config.id,
495 ),
496 partition_capabilities,
497 };
498
499 let offset_committer = KafkaResumeUpperProcessor {
500 config: config.clone(),
501 topic_name: topic.clone(),
502 consumer,
503 statistics: all_export_stats.clone(),
504 };
505
506 if !snapshot_export_stats.is_empty() {
508 if let Err(e) = offset_committer
509 .process_frontier(resume_upper.clone())
510 .await
511 {
512 offset_commit_metrics.offset_commit_failures.inc();
513 tracing::warn!(
514 %e,
515 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
516 worker_id = config.worker_id,
517 source_id = config.id,
518 upper = resume_upper.pretty()
519 );
520 }
521 for statistics in config.statistics.values() {
525 statistics.set_snapshot_records_known(0);
526 statistics.set_snapshot_records_staged(0);
527 }
528 }
529
530 let resume_uppers_process_loop = async move {
531 tokio::pin!(resume_uppers);
532 while let Some(frontier) = resume_uppers.next().await {
533 if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
534 offset_commit_metrics.offset_commit_failures.inc();
535 tracing::warn!(
536 %e,
537 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
538 worker_id = config.worker_id,
539 source_id = config.id,
540 upper = frontier.pretty()
541 );
542 }
543 }
544 std::future::pending::<()>().await;
549 };
550 tokio::pin!(resume_uppers_process_loop);
551
552 let mut metadata_update: Option<MetadataUpdate> = None;
553 let mut snapshot_total = None;
554
555 let max_wait_time =
556 mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
557 loop {
558 tokio::select! {
561 _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
564
565 _ = metadata_input.ready() => {
566 let mut updates = Vec::new();
568 while let Some(event) = metadata_input.next_sync() {
569 if let Event::Data(_, mut data) = event {
570 updates.append(&mut data);
571 }
572 }
573 metadata_update = updates
574 .into_iter()
575 .max_by_key(|(ts, _)| *ts)
576 .map(|(_, update)| update);
577 }
578
579 _ = resume_uppers_process_loop.as_mut() => {},
583 }
584
585 match metadata_update.take() {
586 Some(MetadataUpdate::Partitions(partitions)) => {
587 let max_pid = partitions.keys().last().cloned();
588 let lower = max_pid
589 .map(RangeBound::after)
590 .unwrap_or(RangeBound::NegInfinity);
591 let future_ts = Partitioned::new_range(
592 lower,
593 RangeBound::PosInfinity,
594 MzOffset::from(0),
595 );
596
597 let mut offset_known = 0;
598 for (&pid, &high_watermark) in &partitions {
599 if responsible_for_pid(&config, pid) {
600 offset_known += high_watermark;
601 reader.ensure_partition(pid);
602 if let Entry::Vacant(entry) =
603 reader.partition_capabilities.entry(pid)
604 {
605 let start_offset = match reader.start_offsets.get(&pid) {
606 Some(&offset) => offset.try_into().unwrap(),
607 None => 0u64,
608 };
609 let part_since_ts = Partitioned::new_singleton(
610 RangeBound::exact(pid),
611 MzOffset::from(start_offset),
612 );
613
614 entry.insert(PartitionCapability {
615 data: data_cap.delayed(&part_since_ts),
616 });
617 }
618 }
619 }
620
621 if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
624 snapshot_total = Some(offset_known);
628 }
629
630 for output in &outputs {
634 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
635 health_output.give(
636 &health_cap,
637 HealthStatusMessage {
638 id: Some(output.id),
639 namespace,
640 update: HealthStatusUpdate::running(),
641 },
642 );
643 }
644 }
645 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
646 health_output.give(
647 &health_cap,
648 HealthStatusMessage {
649 id: None,
650 namespace,
651 update: HealthStatusUpdate::running(),
652 },
653 );
654 }
655
656 for export_stat in all_export_stats.iter() {
657 export_stat.set_offset_known(offset_known);
658 }
659
660 data_cap.downgrade(&future_ts);
661 }
662 Some(MetadataUpdate::TransientError(status)) => {
663 if let Some(update) = status.kafka {
664 health_output.give(
665 &health_cap,
666 HealthStatusMessage {
667 id: None,
668 namespace: StatusNamespace::Kafka,
669 update: update.clone(),
670 },
671 );
672 for (output, update) in outputs.iter().repeat_clone(update) {
673 health_output.give(
674 &health_cap,
675 HealthStatusMessage {
676 id: Some(output.id),
677 namespace: StatusNamespace::Kafka,
678 update,
679 },
680 );
681 }
682 }
683 if let Some(update) = status.ssh {
684 health_output.give(
685 &health_cap,
686 HealthStatusMessage {
687 id: None,
688 namespace: StatusNamespace::Ssh,
689 update: update.clone(),
690 },
691 );
692 for (output, update) in outputs.iter().repeat_clone(update) {
693 health_output.give(
694 &health_cap,
695 HealthStatusMessage {
696 id: Some(output.id),
697 namespace: StatusNamespace::Ssh,
698 update,
699 },
700 );
701 }
702 }
703 }
704 Some(MetadataUpdate::DefiniteError(error)) => {
705 health_output.give(
706 &health_cap,
707 HealthStatusMessage {
708 id: None,
709 namespace: StatusNamespace::Kafka,
710 update: HealthStatusUpdate::stalled(
711 error.to_string(),
712 None,
713 ),
714 },
715 );
716 let error = Err(error.into());
717 let time = data_cap.time().clone();
718 for (output, error) in
719 outputs.iter().map(|o| o.output_index).repeat_clone(error)
720 {
721 data_output
722 .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
723 .await;
724 }
725
726 return;
727 }
728 None => {}
729 }
730
731 while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
739 match result {
740 Err(e) => {
741 let error = format!(
742 "kafka error when polling consumer for source: {} topic: {} : {}",
743 reader.source_name, reader.topic_name, e
744 );
745 let status = HealthStatusUpdate::stalled(error, None);
746 health_output.give(
747 &health_cap,
748 HealthStatusMessage {
749 id: None,
750 namespace: StatusNamespace::Kafka,
751 update: status.clone(),
752 },
753 );
754 for (output, status) in outputs.iter().repeat_clone(status) {
755 health_output.give(
756 &health_cap,
757 HealthStatusMessage {
758 id: Some(output.id),
759 namespace: StatusNamespace::Kafka,
760 update: status,
761 },
762 );
763 }
764 }
765 Ok(message) => {
766 let output_messages = outputs
767 .iter()
768 .map(|output| {
769 let (message, ts) = construct_source_message(
770 &message,
771 &output.metadata_columns,
772 );
773 (output.output_index, message, ts)
774 })
775 .collect::<Vec<_>>();
779 for (output_index, message, ts) in output_messages {
780 if let Some((msg, time, diff)) =
781 reader.handle_message(message, ts, &output_index)
782 {
783 let pid = time.interval().singleton().unwrap().unwrap_exact();
784 let part_cap = &reader.partition_capabilities[pid].data;
785 let msg = msg.map_err(|e| {
786 DataflowError::SourceError(Box::new(SourceError {
787 error: SourceErrorDetails::Other(e.to_string().into()),
788 }))
789 });
790 data_output
791 .give_fueled(part_cap, ((output_index, msg), time, diff))
792 .await;
793 }
794 }
795 }
796 }
797 }
798
799 reader.update_stats();
800
801 let mut consumers = std::mem::take(&mut reader.partition_consumers);
803 for consumer in consumers.iter_mut() {
804 let pid = consumer.pid();
805 let mut partition_exhausted = false;
811 for _ in 0..10_000 {
812 let Some(message) = consumer.get_next_message().transpose() else {
813 partition_exhausted = true;
814 break;
815 };
816
817 for output in outputs.iter() {
818 let message = match &message {
819 Ok((msg, pid)) => {
820 let (msg, ts) =
821 construct_source_message(msg, &output.metadata_columns);
822 assert_eq!(*pid, ts.0);
823 Ok(reader.handle_message(msg, ts, &output.output_index))
824 }
825 Err(err) => Err(err),
826 };
827 match message {
828 Ok(Some((msg, time, diff))) => {
829 let pid = time.interval().singleton().unwrap().unwrap_exact();
830 let part_cap = &reader.partition_capabilities[pid].data;
831 let msg = msg.map_err(|e| {
832 DataflowError::SourceError(Box::new(SourceError {
833 error: SourceErrorDetails::Other(e.to_string().into()),
834 }))
835 });
836 data_output
837 .give_fueled(
838 part_cap,
839 ((output.output_index, msg), time, diff),
840 )
841 .await;
842 }
843 Ok(None) => continue,
845 Err(err) => {
846 let last_offset = reader
847 .last_offsets
848 .get(&output.output_index)
849 .expect("output known to be installed")
850 .get(&pid)
851 .expect("partition known to be installed");
852
853 let status = HealthStatusUpdate::stalled(
854 format!(
855 "error consuming from source: {} topic: {topic}:\
856 partition: {pid} last processed offset:\
857 {last_offset} : {err}",
858 config.name
859 ),
860 None,
861 );
862 health_output.give(
863 &health_cap,
864 HealthStatusMessage {
865 id: None,
866 namespace: StatusNamespace::Kafka,
867 update: status.clone(),
868 },
869 );
870 health_output.give(
871 &health_cap,
872 HealthStatusMessage {
873 id: Some(output.id),
874 namespace: StatusNamespace::Kafka,
875 update: status,
876 },
877 );
878 }
879 }
880 }
881 }
882 if !partition_exhausted {
883 notificator.notify_one();
884 }
885 }
886 assert!(reader.partition_consumers.is_empty());
888 reader.partition_consumers = consumers;
889
890 let positions = reader.consumer.position().unwrap();
891 let topic_positions = positions.elements_for_topic(&reader.topic_name);
892 let mut snapshot_staged = 0;
893
894 for position in topic_positions {
895 if let Offset::Offset(offset) = position.offset() {
898 let pid = position.partition();
899 let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
900 let upper =
901 Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
902
903 let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
904 match part_cap.data.try_downgrade(&upper) {
905 Ok(()) => {
906 if !snapshot_export_stats.is_empty() {
907 snapshot_staged += offset.try_into().unwrap_or(0u64);
910 if let Some(snapshot_total) = snapshot_total {
912 snapshot_staged =
915 std::cmp::min(snapshot_staged, snapshot_total);
916 }
917 }
918 }
919 Err(_) => {
920 info!(
923 source_id = config.id.to_string(),
924 worker_id = config.worker_id,
925 num_workers = config.worker_count,
926 "kafka source frontier downgrade skipped due to already \
927 seen offset: {:?}",
928 upper
929 );
930 }
931 };
932
933 }
934 }
935
936 if let (Some(snapshot_total), true) =
937 (snapshot_total, !snapshot_export_stats.is_empty())
938 {
939 for export_stat in snapshot_export_stats.iter() {
940 export_stat.set_snapshot_records_known(snapshot_total);
941 export_stat.set_snapshot_records_staged(snapshot_staged);
942 }
943 if snapshot_total == snapshot_staged {
944 snapshot_export_stats.clear();
945 }
946 }
947 }
948 })
949 });
950
951 (
952 stream.as_collection(),
953 health_stream,
954 button.press_on_drop(),
955 )
956}
957
958impl KafkaResumeUpperProcessor {
959 async fn process_frontier(
960 &self,
961 frontier: Antichain<KafkaTimestamp>,
962 ) -> Result<(), anyhow::Error> {
963 use rdkafka::consumer::CommitMode;
964
965 let mut offsets = vec![];
967 let mut offset_committed = 0;
968 for ts in frontier.iter() {
969 if let Some(pid) = ts.interval().singleton() {
970 let pid = pid.unwrap_exact();
971 if responsible_for_pid(&self.config, *pid) {
972 offsets.push((pid.clone(), *ts.timestamp()));
973
974 offset_committed += ts.timestamp().offset;
979 }
980 }
981 }
982
983 for export_stat in self.statistics.iter() {
984 export_stat.set_offset_committed(offset_committed);
985 }
986
987 if !offsets.is_empty() {
988 let mut tpl = TopicPartitionList::new();
989 for (pid, offset) in offsets {
990 let offset_to_commit =
991 Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
992 tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
993 .expect("offset known to be valid");
994 }
995 let consumer = Arc::clone(&self.consumer);
996 mz_ore::task::spawn_blocking(
997 || format!("source({}) kafka offset commit", self.config.id),
998 move || consumer.commit(&tpl, CommitMode::Sync),
999 )
1000 .await?;
1001 }
1002 Ok(())
1003 }
1004}
1005
1006impl KafkaSourceReader {
1007 fn ensure_partition(&mut self, pid: PartitionId) {
1009 if self.last_offsets.is_empty() {
1010 tracing::info!(
1011 source_id = %self.id,
1012 worker_id = %self.worker_id,
1013 "kafka source does not have any outputs, not creating partition queue");
1014
1015 return;
1016 }
1017 for last_offsets in self.last_offsets.values() {
1018 if last_offsets.contains_key(&pid) {
1020 return;
1021 }
1022 }
1023
1024 let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1025 self.create_partition_queue(pid, Offset::Offset(start_offset));
1026
1027 for last_offsets in self.last_offsets.values_mut() {
1028 let prev = last_offsets.insert(pid, start_offset - 1);
1029 assert_none!(prev);
1030 }
1031 }
1032
1033 fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1035 info!(
1036 source_id = self.id.to_string(),
1037 worker_id = self.worker_id,
1038 num_workers = self.worker_count,
1039 "activating Kafka queue for topic {}, partition {}",
1040 self.topic_name,
1041 partition_id,
1042 );
1043
1044 let tpl = self.consumer.assignment().unwrap();
1046 let mut partition_list = TopicPartitionList::new();
1048 for partition in tpl.elements_for_topic(&self.topic_name) {
1049 partition_list
1050 .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1051 .expect("offset known to be valid");
1052 }
1053 partition_list
1055 .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1056 .expect("offset known to be valid");
1057 self.consumer
1058 .assign(&partition_list)
1059 .expect("assignment known to be valid");
1060
1061 let context = Arc::clone(self.consumer.context());
1064 for pc in &mut self.partition_consumers {
1065 pc.partition_queue = self
1066 .consumer
1067 .split_partition_queue(&self.topic_name, pc.pid)
1068 .expect("partition known to be valid");
1069 pc.partition_queue.set_nonempty_callback({
1070 let context = Arc::clone(&context);
1071 move || context.inner().activate()
1072 });
1073 }
1074
1075 let mut partition_queue = self
1076 .consumer
1077 .split_partition_queue(&self.topic_name, partition_id)
1078 .expect("partition known to be valid");
1079 partition_queue.set_nonempty_callback(move || context.inner().activate());
1080 self.partition_consumers
1081 .push(PartitionConsumer::new(partition_id, partition_queue));
1082 assert_eq!(
1083 self.consumer
1084 .assignment()
1085 .unwrap()
1086 .elements_for_topic(&self.topic_name)
1087 .len(),
1088 self.partition_consumers.len()
1089 );
1090 }
1091
1092 fn update_stats(&mut self) {
1094 while let Ok(stats) = self.stats_rx.try_recv() {
1095 match serde_json::from_str::<Statistics>(&stats.to_string()) {
1096 Ok(statistics) => {
1097 let topic = statistics.topics.get(&self.topic_name);
1098 match topic {
1099 Some(topic) => {
1100 for (id, partition) in &topic.partitions {
1101 self.partition_metrics
1102 .set_offset_max(*id, partition.hi_offset);
1103 }
1104 }
1105 None => error!("No stats found for topic: {}", &self.topic_name),
1106 }
1107 }
1108 Err(e) => {
1109 error!("failed decoding librdkafka statistics JSON: {}", e);
1110 }
1111 }
1112 }
1113 }
1114
1115 fn handle_message(
1118 &mut self,
1119 message: Result<SourceMessage, KafkaHeaderParseError>,
1120 (partition, offset): (PartitionId, MzOffset),
1121 output_index: &usize,
1122 ) -> Option<(
1123 Result<SourceMessage, KafkaHeaderParseError>,
1124 KafkaTimestamp,
1125 Diff,
1126 )> {
1127 assert!(
1139 self.last_offsets
1140 .get(output_index)
1141 .unwrap()
1142 .contains_key(&partition)
1143 );
1144
1145 let last_offset_ref = self
1146 .last_offsets
1147 .get_mut(output_index)
1148 .expect("output known to be installed")
1149 .get_mut(&partition)
1150 .expect("partition known to be installed");
1151
1152 let last_offset = *last_offset_ref;
1153 let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1154 if offset_as_i64 <= last_offset {
1155 info!(
1156 source_id = self.id.to_string(),
1157 worker_id = self.worker_id,
1158 num_workers = self.worker_count,
1159 "kafka message before expected offset: \
1160 source {} (reading topic {}, partition {}, output {}) \
1161 received offset {} expected offset {:?}",
1162 self.source_name,
1163 self.topic_name,
1164 partition,
1165 output_index,
1166 offset.offset,
1167 last_offset + 1,
1168 );
1169 None
1171 } else {
1172 *last_offset_ref = offset_as_i64;
1173
1174 let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1175 Some((message, ts, Diff::ONE))
1176 }
1177 }
1178}
1179
1180fn construct_source_message(
1181 msg: &BorrowedMessage<'_>,
1182 metadata_columns: &[KafkaMetadataKind],
1183) -> (
1184 Result<SourceMessage, KafkaHeaderParseError>,
1185 (PartitionId, MzOffset),
1186) {
1187 let pid = msg.partition();
1188 let Ok(offset) = u64::try_from(msg.offset()) else {
1189 panic!(
1190 "got negative offset ({}) from otherwise non-error'd kafka message",
1191 msg.offset()
1192 );
1193 };
1194
1195 let mut metadata = Row::default();
1196 let mut packer = metadata.packer();
1197 for kind in metadata_columns {
1198 match kind {
1199 KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1200 KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1201 KafkaMetadataKind::Timestamp => {
1202 let ts = msg
1203 .timestamp()
1204 .to_millis()
1205 .expect("kafka sources always have upstream_time");
1206
1207 let d: Datum = DateTime::from_timestamp_millis(ts)
1208 .and_then(|dt| {
1209 let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1210 dt.naive_utc().try_into().ok();
1211 ct
1212 })
1213 .into();
1214 packer.push(d)
1215 }
1216 KafkaMetadataKind::Header { key, use_bytes } => {
1217 match msg.headers() {
1218 Some(headers) => {
1219 let d = headers
1220 .iter()
1221 .filter(|header| header.key == key)
1222 .last()
1223 .map(|header| match header.value {
1224 Some(v) => {
1225 if *use_bytes {
1226 Ok(Datum::Bytes(v))
1227 } else {
1228 match str::from_utf8(v) {
1229 Ok(str) => Ok(Datum::String(str)),
1230 Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1231 key: key.clone(),
1232 raw: v.to_vec(),
1233 }),
1234 }
1235 }
1236 }
1237 None => Ok(Datum::Null),
1238 })
1239 .unwrap_or_else(|| {
1240 Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1241 });
1242 match d {
1243 Ok(d) => packer.push(d),
1244 Err(err) => return (Err(err), (pid, offset.into())),
1246 }
1247 }
1248 None => packer.push(Datum::Null),
1249 }
1250 }
1251 KafkaMetadataKind::Headers => {
1252 packer.push_list_with(|r| {
1253 if let Some(headers) = msg.headers() {
1254 for header in headers.iter() {
1255 match header.value {
1256 Some(v) => r.push_list_with(|record_row| {
1257 record_row.push(Datum::String(header.key));
1258 record_row.push(Datum::Bytes(v));
1259 }),
1260 None => r.push_list_with(|record_row| {
1261 record_row.push(Datum::String(header.key));
1262 record_row.push(Datum::Null);
1263 }),
1264 }
1265 }
1266 }
1267 });
1268 }
1269 }
1270 }
1271
1272 let key = match msg.key() {
1273 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1274 None => Row::pack([Datum::Null]),
1275 };
1276 let value = match msg.payload() {
1277 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1278 None => Row::pack([Datum::Null]),
1279 };
1280 (
1281 Ok(SourceMessage {
1282 key,
1283 value,
1284 metadata,
1285 }),
1286 (pid, offset.into()),
1287 )
1288}
1289
1290struct PartitionConsumer {
1292 pid: PartitionId,
1294 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1296}
1297
1298impl PartitionConsumer {
1299 fn new(
1301 pid: PartitionId,
1302 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1303 ) -> Self {
1304 PartitionConsumer {
1305 pid,
1306 partition_queue,
1307 }
1308 }
1309
1310 fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1317 match self.partition_queue.poll(Duration::from_millis(0)) {
1318 Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1319 Some(Err(err)) => Err(err),
1320 _ => Ok(None),
1321 }
1322 }
1323
1324 fn pid(&self) -> PartitionId {
1326 self.pid
1327 }
1328}
1329
1330struct GlueConsumerContext {
1333 notificator: Arc<Notify>,
1334 stats_tx: crossbeam_channel::Sender<Jsonb>,
1335 inner: MzClientContext,
1336}
1337
1338impl ClientContext for GlueConsumerContext {
1339 fn stats_raw(&self, statistics: &[u8]) {
1340 match Jsonb::from_slice(statistics) {
1341 Ok(statistics) => {
1342 self.stats_tx
1343 .send(statistics)
1344 .expect("timely operator hung up while Kafka source active");
1345 self.activate();
1346 }
1347 Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1348 };
1349 }
1350
1351 fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1354 self.inner.log(level, fac, log_message)
1355 }
1356 fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1357 self.inner.error(error, reason)
1358 }
1359}
1360
1361impl GlueConsumerContext {
1362 fn activate(&self) {
1363 self.notificator.notify_one();
1364 }
1365}
1366
1367impl ConsumerContext for GlueConsumerContext {}
1368
1369#[cfg(test)]
1370mod tests {
1371 use std::sync::Arc;
1372 use std::time::Duration;
1373
1374 use mz_kafka_util::client::create_new_client_config_simple;
1375 use rdkafka::consumer::{BaseConsumer, Consumer};
1376 use rdkafka::{Message, Offset, TopicPartitionList};
1377 use uuid::Uuid;
1378
1379 #[mz_ore::test]
1391 #[ignore]
1392 fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1393 let topic_name = "queue-test";
1394 let pid = 0;
1395
1396 let mut kafka_config = create_new_client_config_simple();
1397 kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1398 kafka_config.set("enable.auto.commit", "false");
1399 kafka_config.set("group.id", Uuid::new_v4().to_string());
1400 kafka_config.set("fetch.message.max.bytes", "100");
1401 let consumer: BaseConsumer<_> = kafka_config.create()?;
1402
1403 let consumer = Arc::new(consumer);
1404
1405 let mut partition_list = TopicPartitionList::new();
1406 partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1409
1410 consumer.assign(&partition_list)?;
1411
1412 let partition_queue = consumer
1413 .split_partition_queue(topic_name, pid)
1414 .expect("missing partition queue");
1415
1416 let expected_messages = 1_000;
1417
1418 let mut common_queue_count = 0;
1419 let mut partition_queue_count = 0;
1420
1421 loop {
1422 if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1423 match msg {
1424 Ok(msg) => {
1425 let _payload =
1426 std::str::from_utf8(msg.payload().expect("missing payload"))?;
1427 if partition_queue_count > 0 {
1428 anyhow::bail!(
1429 "Got message from common queue after we internally switched to partition queue."
1430 );
1431 }
1432
1433 common_queue_count += 1;
1434 }
1435 Err(err) => anyhow::bail!("{}", err),
1436 }
1437 }
1438
1439 match partition_queue.poll(Duration::from_millis(0)) {
1440 Some(Ok(msg)) => {
1441 let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1442 partition_queue_count += 1;
1443 }
1444 Some(Err(err)) => anyhow::bail!("{}", err),
1445 _ => (),
1446 }
1447
1448 if (common_queue_count + partition_queue_count) == expected_messages {
1449 break;
1450 }
1451 }
1452
1453 assert!(
1454 common_queue_count == 0,
1455 "Got {} out of {} messages from common queue. Partition queue: {}",
1456 common_queue_count,
1457 expected_messages,
1458 partition_queue_count
1459 );
1460
1461 Ok(())
1462 }
1463}
1464
1465fn fetch_partition_info<C: ConsumerContext>(
1467 consumer: &BaseConsumer<C>,
1468 topic: &str,
1469 fetch_timeout: Duration,
1470) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1471 let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1472
1473 let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1474 for pid in pids {
1475 offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1476 }
1477
1478 let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1479
1480 let mut result = BTreeMap::new();
1481 for entry in offset_responses.elements() {
1482 let offset = match entry.offset() {
1483 Offset::Offset(offset) => offset,
1484 offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1485 };
1486
1487 let pid = entry.partition();
1488 let watermark = offset.try_into().expect("invalid negative offset");
1489 result.insert(pid, watermark);
1490 }
1491
1492 Ok(result)
1493}
1494
1495#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1497enum MetadataUpdate {
1498 Partitions(BTreeMap<PartitionId, HighWatermark>),
1500 TransientError(HealthStatus),
1504 DefiniteError(SourceError),
1508}
1509
1510impl MetadataUpdate {
1511 fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1513 match self {
1514 Self::Partitions(partitions) => {
1515 let max_pid = partitions.keys().last().copied();
1516 let lower = max_pid
1517 .map(RangeBound::after)
1518 .unwrap_or(RangeBound::NegInfinity);
1519 let future_ts =
1520 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1521
1522 let mut frontier = Antichain::from_elem(future_ts);
1523 for (pid, high_watermark) in partitions {
1524 frontier.insert(Partitioned::new_singleton(
1525 RangeBound::exact(*pid),
1526 MzOffset::from(*high_watermark),
1527 ));
1528 }
1529
1530 Some(frontier)
1531 }
1532 Self::DefiniteError(_) => Some(Antichain::new()),
1533 Self::TransientError(_) => None,
1534 }
1535 }
1536}
1537
1538#[derive(Debug, thiserror::Error)]
1539pub enum KafkaHeaderParseError {
1540 #[error("A header with key '{key}' was not found in the message headers")]
1541 KeyNotFound { key: String },
1542 #[error(
1543 "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1544 )]
1545 Utf8Error { key: String, raw: Vec<u8> },
1546}
1547
1548fn render_metadata_fetcher<'scope>(
1554 scope: Scope<'scope, KafkaTimestamp>,
1555 connection: KafkaSourceConnection,
1556 config: RawSourceCreationConfig,
1557) -> (
1558 StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
1559 StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
1560 PressOnDropButton,
1561) {
1562 let active_worker_id = usize::cast_from(config.id.hashed());
1563 let is_active_worker = active_worker_id % scope.peers() == scope.index();
1564
1565 let resume_upper = Antichain::from_iter(
1566 config
1567 .source_resume_uppers
1568 .values()
1569 .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1570 .flatten(),
1571 );
1572
1573 let name = format!("KafkaMetadataFetcher({})", config.id);
1574 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1575
1576 let (metadata_output, metadata_stream) =
1577 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1578 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1579
1580 let button = builder.build(move |caps| async move {
1581 if !is_active_worker {
1582 return;
1583 }
1584
1585 let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1586
1587 let client_id = connection.client_id(
1588 config.config.config_set(),
1589 &config.config.connection_context,
1590 config.id,
1591 );
1592 let KafkaSourceConnection {
1593 connection,
1594 topic,
1595 topic_metadata_refresh_interval,
1596 ..
1597 } = connection;
1598
1599 let consumer: Result<BaseConsumer<_>, _> = connection
1600 .create_with_context(
1601 &config.config,
1602 MzClientContext::default(),
1603 &btreemap! {
1604 "topic.metadata.refresh.interval.ms" =>
1607 topic_metadata_refresh_interval
1608 .as_millis()
1609 .to_string(),
1610 "client.id" => format!("{client_id}-metadata"),
1613 },
1614 InTask::Yes,
1615 )
1616 .await;
1617
1618 let consumer = match consumer {
1619 Ok(consumer) => consumer,
1620 Err(e) => {
1621 let msg = format!(
1622 "failed creating kafka metadata consumer: {}",
1623 e.display_with_causes()
1624 );
1625 let status_update = HealthStatusUpdate::halting(msg, None);
1626 let status = match e {
1627 ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1628 _ => HealthStatus::kafka(status_update),
1629 };
1630 let error = MetadataUpdate::TransientError(status);
1631 let timestamp = (config.now_fn)().into();
1632 metadata_output.give(&metadata_cap, (timestamp, error));
1633
1634 std::future::pending::<()>().await;
1638 unreachable!("pending future never returns");
1639 }
1640 };
1641
1642 let (tx, mut rx) = mpsc::unbounded_channel();
1643 spawn_metadata_thread(config, consumer, topic, tx);
1644
1645 let mut prev_upstream_frontier = resume_upper;
1646
1647 while let Some((timestamp, mut update)) = rx.recv().await {
1648 if prev_upstream_frontier.is_empty() {
1649 return;
1650 }
1651
1652 if let Some(upstream_frontier) = update.upstream_frontier() {
1653 if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1663 let error = SourceError {
1664 error: SourceErrorDetails::Other("topic was recreated".into()),
1665 };
1666 update = MetadataUpdate::DefiniteError(error);
1667 }
1668 }
1669
1670 if let Some(upstream_frontier) = update.upstream_frontier() {
1671 prev_upstream_frontier = upstream_frontier.clone();
1672
1673 let probe = Probe {
1674 probe_ts: timestamp,
1675 upstream_frontier,
1676 };
1677 probe_output.give(&probe_cap, probe);
1678 }
1679
1680 metadata_output.give(&metadata_cap, (timestamp, update));
1681 }
1682 });
1683
1684 (metadata_stream, probe_stream, button.press_on_drop())
1685}
1686
1687fn spawn_metadata_thread<C: ConsumerContext>(
1688 config: RawSourceCreationConfig,
1689 consumer: BaseConsumer<TunnelingClientContext<C>>,
1690 topic: String,
1691 tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1692) {
1693 thread::Builder::new()
1695 .name(format!("kfk-mtdt-{}", config.id))
1696 .spawn(move || {
1697 trace!(
1698 source_id = config.id.to_string(),
1699 worker_id = config.worker_id,
1700 num_workers = config.worker_count,
1701 "kafka metadata thread: starting..."
1702 );
1703
1704 let timestamp_interval = config.timestamp_interval;
1705 let mut ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
1706
1707 loop {
1708 let probe_ts = ticker.tick_blocking();
1709 let result = fetch_partition_info(
1710 &consumer,
1711 &topic,
1712 config
1713 .config
1714 .parameters
1715 .kafka_timeout_config
1716 .fetch_metadata_timeout,
1717 );
1718 trace!(
1719 source_id = config.id.to_string(),
1720 worker_id = config.worker_id,
1721 num_workers = config.worker_count,
1722 "kafka metadata thread: metadata fetch result: {:?}",
1723 result
1724 );
1725 let update = match result {
1726 Ok(partitions) => {
1727 trace!(
1728 source_id = config.id.to_string(),
1729 worker_id = config.worker_id,
1730 num_workers = config.worker_count,
1731 "kafka metadata thread: fetched partition metadata info",
1732 );
1733
1734 MetadataUpdate::Partitions(partitions)
1735 }
1736 Err(GetPartitionsError::TopicDoesNotExist) => {
1737 let error = SourceError {
1738 error: SourceErrorDetails::Other("topic was deleted".into()),
1739 };
1740 MetadataUpdate::DefiniteError(error)
1741 }
1742 Err(e) => {
1743 let kafka_status = Some(HealthStatusUpdate::stalled(
1744 format!("{}", e.display_with_causes()),
1745 None,
1746 ));
1747
1748 let ssh_status = consumer.client().context().tunnel_status();
1749 let ssh_status = match ssh_status {
1750 SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1751 SshTunnelStatus::Errored(e) => {
1752 Some(HealthStatusUpdate::stalled(e, None))
1753 }
1754 };
1755
1756 MetadataUpdate::TransientError(HealthStatus {
1757 kafka: kafka_status,
1758 ssh: ssh_status,
1759 })
1760 }
1761 };
1762
1763 if tx.send((probe_ts, update)).is_err() {
1764 break;
1765 }
1766 }
1767
1768 info!(
1769 source_id = config.id.to_string(),
1770 worker_id = config.worker_id,
1771 num_workers = config.worker_count,
1772 "kafka metadata thread: receiver has gone away; shutting down."
1773 )
1774 })
1775 .unwrap();
1776}