1use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::str::{self};
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use chrono::{DateTime, NaiveDateTime};
19use differential_dataflow::{AsCollection, Hashable};
20use futures::StreamExt;
21use itertools::Itertools;
22use maplit::btreemap;
23use mz_kafka_util::client::{
24 GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
25};
26use mz_ore::assert_none;
27use mz_ore::cast::CastFrom;
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::iter::IteratorExt;
31use mz_repr::adt::timestamp::CheckedTimestamp;
32use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
33use mz_ssh_util::tunnel::SshTunnelStatus;
34use mz_storage_types::errors::{
35 ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
36};
37use mz_storage_types::sources::kafka::{
38 KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
39};
40use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
41use mz_timely_util::antichain::AntichainExt;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::containers::stack::FueledBuilder;
46use mz_timely_util::order::Partitioned;
47use rdkafka::consumer::base_consumer::PartitionQueue;
48use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
49use rdkafka::error::KafkaError;
50use rdkafka::message::{BorrowedMessage, Headers};
51use rdkafka::statistics::Statistics;
52use rdkafka::topic_partition_list::Offset;
53use rdkafka::{ClientContext, Message, TopicPartitionList};
54use serde::{Deserialize, Serialize};
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Pipeline;
58use timely::dataflow::operators::Capability;
59use timely::dataflow::operators::core::Partition;
60use timely::dataflow::operators::vec::Broadcast;
61use timely::dataflow::{Scope, StreamVec};
62use timely::progress::Antichain;
63use timely::progress::Timestamp;
64use tokio::sync::{Notify, mpsc};
65use tracing::{error, info, trace};
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
68use crate::metrics::source::kafka::KafkaSourceMetrics;
69use crate::source::types::{FuelSize, Probe, SignaledFuture, SourceRender, StackedCollection};
70use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
71use crate::statistics::SourceStatistics;
72
73#[derive(
74 Clone,
75 Debug,
76 Default,
77 PartialEq,
78 Eq,
79 PartialOrd,
80 Ord,
81 Serialize,
82 Deserialize
83)]
84struct HealthStatus {
85 kafka: Option<HealthStatusUpdate>,
86 ssh: Option<HealthStatusUpdate>,
87}
88
89impl HealthStatus {
90 fn kafka(update: HealthStatusUpdate) -> Self {
91 Self {
92 kafka: Some(update),
93 ssh: None,
94 }
95 }
96
97 fn ssh(update: HealthStatusUpdate) -> Self {
98 Self {
99 kafka: None,
100 ssh: Some(update),
101 }
102 }
103}
104
105pub struct KafkaSourceReader {
107 topic_name: String,
109 source_name: String,
111 id: GlobalId,
113 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
115 partition_consumers: Vec<PartitionConsumer>,
117 worker_id: usize,
119 worker_count: usize,
121 last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
125 start_offsets: BTreeMap<PartitionId, i64>,
127 stats_rx: crossbeam_channel::Receiver<Jsonb>,
129 partition_metrics: KafkaSourceMetrics,
131 partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
133}
134
135struct PartitionCapability {
136 data: Capability<KafkaTimestamp>,
138}
139
140type HighWatermark = u64;
144
145pub struct KafkaResumeUpperProcessor {
148 config: RawSourceCreationConfig,
149 topic_name: String,
150 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
151 statistics: Vec<SourceStatistics>,
152}
153
154fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
158 let pid = usize::try_from(pid).expect("positive pid");
159 ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
160}
161
162struct SourceOutputInfo {
163 id: GlobalId,
164 output_index: usize,
165 resume_upper: Antichain<KafkaTimestamp>,
166 metadata_columns: Vec<KafkaMetadataKind>,
167}
168
169impl SourceRender for KafkaSourceConnection {
170 type Time = KafkaTimestamp;
175
176 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
177
178 fn render<'scope>(
179 self,
180 scope: Scope<'scope, KafkaTimestamp>,
181 config: &RawSourceCreationConfig,
182 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
183 start_signal: impl std::future::Future<Output = ()> + 'static,
184 ) -> (
185 BTreeMap<
186 GlobalId,
187 StackedCollection<'scope, KafkaTimestamp, Result<SourceMessage, DataflowError>>,
188 >,
189 StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
190 StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
191 Vec<PressOnDropButton>,
192 ) {
193 let (metadata, probes, metadata_token) =
194 render_metadata_fetcher(scope, self.clone(), config.clone());
195 let (data, health, reader_token) = render_reader(
196 scope,
197 self,
198 config.clone(),
199 resume_uppers,
200 metadata,
201 start_signal,
202 );
203
204 let partition_count = u64::cast_from(config.source_exports.len());
205 let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
206 partition_count,
207 |((output, data), time, diff)| {
208 let output = u64::cast_from(output);
209 (output, (data, time, diff))
210 },
211 );
212 let mut data_collections = BTreeMap::new();
213 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
214 data_collections.insert(*id, data_stream.as_collection());
215 }
216
217 (
218 data_collections,
219 health,
220 probes,
221 vec![metadata_token, reader_token],
222 )
223 }
224}
225
226fn render_reader<'scope>(
231 scope: Scope<'scope, KafkaTimestamp>,
232 connection: KafkaSourceConnection,
233 config: RawSourceCreationConfig,
234 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
235 metadata_stream: StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
236 start_signal: impl std::future::Future<Output = ()> + 'static,
237) -> (
238 StackedCollection<'scope, KafkaTimestamp, (usize, Result<SourceMessage, DataflowError>)>,
239 StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
240 PressOnDropButton,
241) {
242 let name = format!("KafkaReader({})", config.id);
243 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
244
245 let (data_output, stream) = builder.new_output::<FueledBuilder<_>>();
246 let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
247
248 let mut metadata_input = builder.new_disconnected_input(metadata_stream.broadcast(), Pipeline);
249
250 let mut outputs = vec![];
251
252 let mut all_export_stats = vec![];
254 let mut snapshot_export_stats = vec![];
255 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
256 let SourceExport {
257 details,
258 storage_metadata: _,
259 data_config: _,
260 } = export;
261 let resume_upper = Antichain::from_iter(
262 config
263 .source_resume_uppers
264 .get(id)
265 .expect("all source exports must be present in source resume uppers")
266 .iter()
267 .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
268 );
269
270 let metadata_columns = match details {
271 SourceExportDetails::Kafka(details) => details
272 .metadata_columns
273 .iter()
274 .map(|(_name, kind)| kind.clone())
275 .collect::<Vec<_>>(),
276 _ => panic!("unexpected source export details: {:?}", details),
277 };
278
279 let statistics = config
280 .statistics
281 .get(id)
282 .expect("statistics have been initialized")
283 .clone();
284 if resume_upper.as_ref() == &[Partitioned::minimum()] {
286 snapshot_export_stats.push(statistics.clone());
287 }
288 all_export_stats.push(statistics);
289
290 let output = SourceOutputInfo {
291 id: *id,
292 resume_upper,
293 output_index: idx,
294 metadata_columns,
295 };
296 outputs.push(output);
297 }
298
299 let busy_signal = Arc::clone(&config.busy_signal);
300 let button = builder.build(move |caps| {
301 SignaledFuture::new(busy_signal, async move {
302 let [mut data_cap, health_cap] = caps.try_into().unwrap();
303
304 let client_id = connection.client_id(
305 config.config.config_set(),
306 &config.config.connection_context,
307 config.id,
308 );
309 let group_id = connection.group_id(&config.config.connection_context, config.id);
310 let KafkaSourceConnection {
311 connection,
312 topic,
313 topic_metadata_refresh_interval,
314 start_offsets,
315 metadata_columns: _,
316 connection_id: _, group_id_prefix: _, } = connection;
321
322 let mut start_offsets: BTreeMap<_, i64> = start_offsets
324 .clone()
325 .into_iter()
326 .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
327 .map(|(k, v)| (k, v))
328 .collect();
329
330 let mut partition_capabilities = BTreeMap::new();
331 let mut max_pid = None;
332 let resume_upper = Antichain::from_iter(
333 outputs
334 .iter()
335 .map(|output| output.resume_upper.clone())
336 .flatten(),
337 );
338
339 for ts in resume_upper.elements() {
340 if let Some(pid) = ts.interval().singleton() {
341 let pid = pid.unwrap_exact();
342 max_pid = std::cmp::max(max_pid, Some(*pid));
343 if responsible_for_pid(&config, *pid) {
344 let restored_offset = i64::try_from(ts.timestamp().offset)
345 .expect("restored kafka offsets must fit into i64");
346 if let Some(start_offset) = start_offsets.get_mut(pid) {
347 *start_offset = std::cmp::max(restored_offset, *start_offset);
348 } else {
349 start_offsets.insert(*pid, restored_offset);
350 }
351
352 let part_ts = Partitioned::new_singleton(
353 RangeBound::exact(*pid),
354 ts.timestamp().clone(),
355 );
356 let part_cap = PartitionCapability {
357 data: data_cap.delayed(&part_ts),
358 };
359 partition_capabilities.insert(*pid, part_cap);
360 }
361 }
362 }
363 let lower = max_pid
364 .map(RangeBound::after)
365 .unwrap_or(RangeBound::NegInfinity);
366 let future_ts =
367 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
368 data_cap.downgrade(&future_ts);
369
370 info!(
371 source_id = config.id.to_string(),
372 worker_id = config.worker_id,
373 num_workers = config.worker_count,
374 "instantiating Kafka source reader at offsets {start_offsets:?}"
375 );
376
377 let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
378 let notificator = Arc::new(Notify::new());
379
380 let consumer: Result<BaseConsumer<_>, _> = connection
381 .create_with_context(
382 &config.config,
383 GlueConsumerContext {
384 notificator: Arc::clone(¬ificator),
385 stats_tx,
386 inner: MzClientContext::default(),
387 },
388 &btreemap! {
389 "enable.auto.commit" => "false".into(),
394 "auto.offset.reset" => "earliest".into(),
397 "topic.metadata.refresh.interval.ms" =>
400 topic_metadata_refresh_interval
401 .as_millis()
402 .to_string(),
403 "fetch.message.max.bytes" => "134217728".into(),
405 "group.id" => group_id.clone(),
411 "client.id" => client_id.clone(),
414 },
415 InTask::Yes,
416 )
417 .await;
418
419 let consumer = match consumer {
420 Ok(consumer) => Arc::new(consumer),
421 Err(e) => {
422 let update = HealthStatusUpdate::halting(
423 format!(
424 "failed creating kafka reader consumer: {}",
425 e.display_with_causes()
426 ),
427 None,
428 );
429 health_output.give(
430 &health_cap,
431 HealthStatusMessage {
432 id: None,
433 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
434 StatusNamespace::Ssh
435 } else {
436 StatusNamespace::Kafka
437 },
438 update: update.clone(),
439 },
440 );
441 for (output, update) in outputs.iter().repeat_clone(update) {
442 health_output.give(
443 &health_cap,
444 HealthStatusMessage {
445 id: Some(output.id),
446 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
447 StatusNamespace::Ssh
448 } else {
449 StatusNamespace::Kafka
450 },
451 update,
452 },
453 );
454 }
455 std::future::pending::<()>().await;
459 unreachable!("pending future never returns");
460 }
461 };
462
463 start_signal.await;
467 info!(
468 source_id = config.id.to_string(),
469 worker_id = config.worker_id,
470 num_workers = config.worker_count,
471 "kafka worker noticed rehydration is finished, starting partition queues..."
472 );
473
474 let partition_ids = start_offsets.keys().copied().collect();
475 let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
476
477 let mut reader = KafkaSourceReader {
478 topic_name: topic.clone(),
479 source_name: config.name.clone(),
480 id: config.id,
481 partition_consumers: Vec::new(),
482 consumer: Arc::clone(&consumer),
483 worker_id: config.worker_id,
484 worker_count: config.worker_count,
485 last_offsets: outputs
486 .iter()
487 .map(|output| (output.output_index, BTreeMap::new()))
488 .collect(),
489 start_offsets,
490 stats_rx,
491 partition_metrics: config.metrics.get_kafka_source_metrics(
492 partition_ids,
493 topic.clone(),
494 config.id,
495 ),
496 partition_capabilities,
497 };
498
499 let offset_committer = KafkaResumeUpperProcessor {
500 config: config.clone(),
501 topic_name: topic.clone(),
502 consumer,
503 statistics: all_export_stats.clone(),
504 };
505
506 if !snapshot_export_stats.is_empty() {
508 if let Err(e) = offset_committer
509 .process_frontier(resume_upper.clone())
510 .await
511 {
512 offset_commit_metrics.offset_commit_failures.inc();
513 tracing::warn!(
514 %e,
515 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
516 worker_id = config.worker_id,
517 source_id = config.id,
518 upper = resume_upper.pretty()
519 );
520 }
521 for statistics in config.statistics.values() {
525 statistics.set_snapshot_records_known(0);
526 statistics.set_snapshot_records_staged(0);
527 }
528 }
529
530 let resume_uppers_process_loop = async move {
531 tokio::pin!(resume_uppers);
532 while let Some(frontier) = resume_uppers.next().await {
533 if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
534 offset_commit_metrics.offset_commit_failures.inc();
535 tracing::warn!(
536 %e,
537 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
538 worker_id = config.worker_id,
539 source_id = config.id,
540 upper = frontier.pretty()
541 );
542 }
543 }
544 std::future::pending::<()>().await;
549 };
550 tokio::pin!(resume_uppers_process_loop);
551
552 let mut metadata_update: Option<MetadataUpdate> = None;
553 let mut snapshot_total = None;
554
555 let max_wait_time =
556 mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
557 loop {
558 tokio::select! {
561 _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
564
565 _ = metadata_input.ready() => {
566 let mut updates = Vec::new();
568 while let Some(event) = metadata_input.next_sync() {
569 if let Event::Data(_, mut data) = event {
570 updates.append(&mut data);
571 }
572 }
573 metadata_update = updates
574 .into_iter()
575 .max_by_key(|(ts, _)| *ts)
576 .map(|(_, update)| update);
577 }
578
579 _ = resume_uppers_process_loop.as_mut() => {},
583 }
584
585 match metadata_update.take() {
586 Some(MetadataUpdate::Partitions(partitions)) => {
587 let max_pid = partitions.keys().last().cloned();
588 let lower = max_pid
589 .map(RangeBound::after)
590 .unwrap_or(RangeBound::NegInfinity);
591 let future_ts = Partitioned::new_range(
592 lower,
593 RangeBound::PosInfinity,
594 MzOffset::from(0),
595 );
596
597 let mut offset_known = 0;
598 for (&pid, &high_watermark) in &partitions {
599 if responsible_for_pid(&config, pid) {
600 offset_known += high_watermark;
601 reader.ensure_partition(pid);
602 if let Entry::Vacant(entry) =
603 reader.partition_capabilities.entry(pid)
604 {
605 let start_offset = match reader.start_offsets.get(&pid) {
606 Some(&offset) => offset.try_into().unwrap(),
607 None => 0u64,
608 };
609 let part_since_ts = Partitioned::new_singleton(
610 RangeBound::exact(pid),
611 MzOffset::from(start_offset),
612 );
613
614 entry.insert(PartitionCapability {
615 data: data_cap.delayed(&part_since_ts),
616 });
617 }
618 }
619 }
620
621 if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
624 snapshot_total = Some(offset_known);
628 }
629
630 for output in &outputs {
634 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
635 health_output.give(
636 &health_cap,
637 HealthStatusMessage {
638 id: Some(output.id),
639 namespace,
640 update: HealthStatusUpdate::running(),
641 },
642 );
643 }
644 }
645 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
646 health_output.give(
647 &health_cap,
648 HealthStatusMessage {
649 id: None,
650 namespace,
651 update: HealthStatusUpdate::running(),
652 },
653 );
654 }
655
656 for export_stat in all_export_stats.iter() {
657 export_stat.set_offset_known(offset_known);
658 }
659
660 data_cap.downgrade(&future_ts);
661 }
662 Some(MetadataUpdate::TransientError(status)) => {
663 if let Some(update) = status.kafka {
664 health_output.give(
665 &health_cap,
666 HealthStatusMessage {
667 id: None,
668 namespace: StatusNamespace::Kafka,
669 update: update.clone(),
670 },
671 );
672 for (output, update) in outputs.iter().repeat_clone(update) {
673 health_output.give(
674 &health_cap,
675 HealthStatusMessage {
676 id: Some(output.id),
677 namespace: StatusNamespace::Kafka,
678 update,
679 },
680 );
681 }
682 }
683 if let Some(update) = status.ssh {
684 health_output.give(
685 &health_cap,
686 HealthStatusMessage {
687 id: None,
688 namespace: StatusNamespace::Ssh,
689 update: update.clone(),
690 },
691 );
692 for (output, update) in outputs.iter().repeat_clone(update) {
693 health_output.give(
694 &health_cap,
695 HealthStatusMessage {
696 id: Some(output.id),
697 namespace: StatusNamespace::Ssh,
698 update,
699 },
700 );
701 }
702 }
703 }
704 Some(MetadataUpdate::DefiniteError(error)) => {
705 health_output.give(
706 &health_cap,
707 HealthStatusMessage {
708 id: None,
709 namespace: StatusNamespace::Kafka,
710 update: HealthStatusUpdate::stalled(
711 error.to_string(),
712 None,
713 ),
714 },
715 );
716 let error = Err(error.into());
717 let time = data_cap.time().clone();
718 for (output, error) in
719 outputs.iter().map(|o| o.output_index).repeat_clone(error)
720 {
721 let update = ((output, error), time, Diff::ONE);
722 let size = update.fuel_size();
723 data_output
724 .give_fueled(&data_cap, update, size)
725 .await;
726 }
727
728 return;
729 }
730 None => {}
731 }
732
733 while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
741 match result {
742 Err(e) => {
743 let error = format!(
744 "kafka error when polling consumer for source: {} topic: {} : {}",
745 reader.source_name, reader.topic_name, e
746 );
747 let status = HealthStatusUpdate::stalled(error, None);
748 health_output.give(
749 &health_cap,
750 HealthStatusMessage {
751 id: None,
752 namespace: StatusNamespace::Kafka,
753 update: status.clone(),
754 },
755 );
756 for (output, status) in outputs.iter().repeat_clone(status) {
757 health_output.give(
758 &health_cap,
759 HealthStatusMessage {
760 id: Some(output.id),
761 namespace: StatusNamespace::Kafka,
762 update: status,
763 },
764 );
765 }
766 }
767 Ok(message) => {
768 let output_messages = outputs
769 .iter()
770 .map(|output| {
771 let (message, ts) = construct_source_message(
772 &message,
773 &output.metadata_columns,
774 );
775 (output.output_index, message, ts)
776 })
777 .collect::<Vec<_>>();
781 for (output_index, message, ts) in output_messages {
782 if let Some((msg, time, diff)) =
783 reader.handle_message(message, ts, &output_index)
784 {
785 let pid = time.interval().singleton().unwrap().unwrap_exact();
786 let part_cap = &reader.partition_capabilities[pid].data;
787 let msg = msg.map_err(|e| {
788 DataflowError::SourceError(Box::new(SourceError {
789 error: SourceErrorDetails::Other(e.to_string().into()),
790 }))
791 });
792 let update = ((output_index, msg), time, diff);
793 let size = update.fuel_size();
794 data_output
795 .give_fueled(part_cap, update, size)
796 .await;
797 }
798 }
799 }
800 }
801 }
802
803 reader.update_stats();
804
805 let mut consumers = std::mem::take(&mut reader.partition_consumers);
807 for consumer in consumers.iter_mut() {
808 let pid = consumer.pid();
809 let mut partition_exhausted = false;
815 for _ in 0..10_000 {
816 let Some(message) = consumer.get_next_message().transpose() else {
817 partition_exhausted = true;
818 break;
819 };
820
821 for output in outputs.iter() {
822 let message = match &message {
823 Ok((msg, pid)) => {
824 let (msg, ts) =
825 construct_source_message(msg, &output.metadata_columns);
826 assert_eq!(*pid, ts.0);
827 Ok(reader.handle_message(msg, ts, &output.output_index))
828 }
829 Err(err) => Err(err),
830 };
831 match message {
832 Ok(Some((msg, time, diff))) => {
833 let pid = time.interval().singleton().unwrap().unwrap_exact();
834 let part_cap = &reader.partition_capabilities[pid].data;
835 let msg = msg.map_err(|e| {
836 DataflowError::SourceError(Box::new(SourceError {
837 error: SourceErrorDetails::Other(e.to_string().into()),
838 }))
839 });
840 let update =
841 ((output.output_index, msg), time, diff);
842 let size = update.fuel_size();
843 data_output
844 .give_fueled(part_cap, update, size)
845 .await;
846 }
847 Ok(None) => continue,
849 Err(err) => {
850 let last_offset = reader
851 .last_offsets
852 .get(&output.output_index)
853 .expect("output known to be installed")
854 .get(&pid)
855 .expect("partition known to be installed");
856
857 let status = HealthStatusUpdate::stalled(
858 format!(
859 "error consuming from source: {} topic: {topic}:\
860 partition: {pid} last processed offset:\
861 {last_offset} : {err}",
862 config.name
863 ),
864 None,
865 );
866 health_output.give(
867 &health_cap,
868 HealthStatusMessage {
869 id: None,
870 namespace: StatusNamespace::Kafka,
871 update: status.clone(),
872 },
873 );
874 health_output.give(
875 &health_cap,
876 HealthStatusMessage {
877 id: Some(output.id),
878 namespace: StatusNamespace::Kafka,
879 update: status,
880 },
881 );
882 }
883 }
884 }
885 }
886 if !partition_exhausted {
887 notificator.notify_one();
888 }
889 }
890 assert!(reader.partition_consumers.is_empty());
892 reader.partition_consumers = consumers;
893
894 let positions = reader.consumer.position().unwrap();
895 let topic_positions = positions.elements_for_topic(&reader.topic_name);
896 let mut snapshot_staged = 0;
897
898 for position in topic_positions {
899 if let Offset::Offset(offset) = position.offset() {
902 let pid = position.partition();
903 let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
904 let upper =
905 Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
906
907 let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
908 match part_cap.data.try_downgrade(&upper) {
909 Ok(()) => {
910 if !snapshot_export_stats.is_empty() {
911 snapshot_staged += offset.try_into().unwrap_or(0u64);
914 if let Some(snapshot_total) = snapshot_total {
916 snapshot_staged =
919 std::cmp::min(snapshot_staged, snapshot_total);
920 }
921 }
922 }
923 Err(_) => {
924 info!(
927 source_id = config.id.to_string(),
928 worker_id = config.worker_id,
929 num_workers = config.worker_count,
930 "kafka source frontier downgrade skipped due to already \
931 seen offset: {:?}",
932 upper
933 );
934 }
935 };
936
937 }
938 }
939
940 if let (Some(snapshot_total), true) =
941 (snapshot_total, !snapshot_export_stats.is_empty())
942 {
943 for export_stat in snapshot_export_stats.iter() {
944 export_stat.set_snapshot_records_known(snapshot_total);
945 export_stat.set_snapshot_records_staged(snapshot_staged);
946 }
947 if snapshot_total == snapshot_staged {
948 snapshot_export_stats.clear();
949 }
950 }
951 }
952 })
953 });
954
955 (
956 stream.as_collection(),
957 health_stream,
958 button.press_on_drop(),
959 )
960}
961
962impl KafkaResumeUpperProcessor {
963 async fn process_frontier(
964 &self,
965 frontier: Antichain<KafkaTimestamp>,
966 ) -> Result<(), anyhow::Error> {
967 use rdkafka::consumer::CommitMode;
968
969 let mut offsets = vec![];
971 let mut offset_committed = 0;
972 for ts in frontier.iter() {
973 if let Some(pid) = ts.interval().singleton() {
974 let pid = pid.unwrap_exact();
975 if responsible_for_pid(&self.config, *pid) {
976 offsets.push((pid.clone(), *ts.timestamp()));
977
978 offset_committed += ts.timestamp().offset;
983 }
984 }
985 }
986
987 for export_stat in self.statistics.iter() {
988 export_stat.set_offset_committed(offset_committed);
989 }
990
991 if !offsets.is_empty() {
992 let mut tpl = TopicPartitionList::new();
993 for (pid, offset) in offsets {
994 let offset_to_commit =
995 Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
996 tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
997 .expect("offset known to be valid");
998 }
999 let consumer = Arc::clone(&self.consumer);
1000 mz_ore::task::spawn_blocking(
1001 || format!("source({}) kafka offset commit", self.config.id),
1002 move || consumer.commit(&tpl, CommitMode::Sync),
1003 )
1004 .await?;
1005 }
1006 Ok(())
1007 }
1008}
1009
1010impl KafkaSourceReader {
1011 fn ensure_partition(&mut self, pid: PartitionId) {
1013 if self.last_offsets.is_empty() {
1014 tracing::info!(
1015 source_id = %self.id,
1016 worker_id = %self.worker_id,
1017 "kafka source does not have any outputs, not creating partition queue");
1018
1019 return;
1020 }
1021 for last_offsets in self.last_offsets.values() {
1022 if last_offsets.contains_key(&pid) {
1024 return;
1025 }
1026 }
1027
1028 let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1029 self.create_partition_queue(pid, Offset::Offset(start_offset));
1030
1031 for last_offsets in self.last_offsets.values_mut() {
1032 let prev = last_offsets.insert(pid, start_offset - 1);
1033 assert_none!(prev);
1034 }
1035 }
1036
1037 fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1039 info!(
1040 source_id = self.id.to_string(),
1041 worker_id = self.worker_id,
1042 num_workers = self.worker_count,
1043 "activating Kafka queue for topic {}, partition {}",
1044 self.topic_name,
1045 partition_id,
1046 );
1047
1048 let tpl = self.consumer.assignment().unwrap();
1050 let mut partition_list = TopicPartitionList::new();
1052 for partition in tpl.elements_for_topic(&self.topic_name) {
1053 partition_list
1054 .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1055 .expect("offset known to be valid");
1056 }
1057 partition_list
1059 .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1060 .expect("offset known to be valid");
1061 self.consumer
1062 .assign(&partition_list)
1063 .expect("assignment known to be valid");
1064
1065 let context = Arc::clone(self.consumer.context());
1068 for pc in &mut self.partition_consumers {
1069 pc.partition_queue = self
1070 .consumer
1071 .split_partition_queue(&self.topic_name, pc.pid)
1072 .expect("partition known to be valid");
1073 pc.partition_queue.set_nonempty_callback({
1074 let context = Arc::clone(&context);
1075 move || context.inner().activate()
1076 });
1077 }
1078
1079 let mut partition_queue = self
1080 .consumer
1081 .split_partition_queue(&self.topic_name, partition_id)
1082 .expect("partition known to be valid");
1083 partition_queue.set_nonempty_callback(move || context.inner().activate());
1084 self.partition_consumers
1085 .push(PartitionConsumer::new(partition_id, partition_queue));
1086 assert_eq!(
1087 self.consumer
1088 .assignment()
1089 .unwrap()
1090 .elements_for_topic(&self.topic_name)
1091 .len(),
1092 self.partition_consumers.len()
1093 );
1094 }
1095
1096 fn update_stats(&mut self) {
1098 while let Ok(stats) = self.stats_rx.try_recv() {
1099 match serde_json::from_str::<Statistics>(&stats.to_string()) {
1100 Ok(statistics) => {
1101 let topic = statistics.topics.get(&self.topic_name);
1102 match topic {
1103 Some(topic) => {
1104 for (id, partition) in &topic.partitions {
1105 self.partition_metrics
1106 .set_offset_max(*id, partition.hi_offset);
1107 }
1108 }
1109 None => error!("No stats found for topic: {}", &self.topic_name),
1110 }
1111 }
1112 Err(e) => {
1113 error!("failed decoding librdkafka statistics JSON: {}", e);
1114 }
1115 }
1116 }
1117 }
1118
1119 fn handle_message(
1122 &mut self,
1123 message: Result<SourceMessage, KafkaHeaderParseError>,
1124 (partition, offset): (PartitionId, MzOffset),
1125 output_index: &usize,
1126 ) -> Option<(
1127 Result<SourceMessage, KafkaHeaderParseError>,
1128 KafkaTimestamp,
1129 Diff,
1130 )> {
1131 assert!(
1143 self.last_offsets
1144 .get(output_index)
1145 .unwrap()
1146 .contains_key(&partition)
1147 );
1148
1149 let last_offset_ref = self
1150 .last_offsets
1151 .get_mut(output_index)
1152 .expect("output known to be installed")
1153 .get_mut(&partition)
1154 .expect("partition known to be installed");
1155
1156 let last_offset = *last_offset_ref;
1157 let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1158 if offset_as_i64 <= last_offset {
1159 info!(
1160 source_id = self.id.to_string(),
1161 worker_id = self.worker_id,
1162 num_workers = self.worker_count,
1163 "kafka message before expected offset: \
1164 source {} (reading topic {}, partition {}, output {}) \
1165 received offset {} expected offset {:?}",
1166 self.source_name,
1167 self.topic_name,
1168 partition,
1169 output_index,
1170 offset.offset,
1171 last_offset + 1,
1172 );
1173 None
1175 } else {
1176 *last_offset_ref = offset_as_i64;
1177
1178 let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1179 Some((message, ts, Diff::ONE))
1180 }
1181 }
1182}
1183
1184fn construct_source_message(
1185 msg: &BorrowedMessage<'_>,
1186 metadata_columns: &[KafkaMetadataKind],
1187) -> (
1188 Result<SourceMessage, KafkaHeaderParseError>,
1189 (PartitionId, MzOffset),
1190) {
1191 let pid = msg.partition();
1192 let Ok(offset) = u64::try_from(msg.offset()) else {
1193 panic!(
1194 "got negative offset ({}) from otherwise non-error'd kafka message",
1195 msg.offset()
1196 );
1197 };
1198
1199 let mut metadata = Row::default();
1200 let mut packer = metadata.packer();
1201 for kind in metadata_columns {
1202 match kind {
1203 KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1204 KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1205 KafkaMetadataKind::Timestamp => {
1206 let ts = msg
1207 .timestamp()
1208 .to_millis()
1209 .expect("kafka sources always have upstream_time");
1210
1211 let d: Datum = DateTime::from_timestamp_millis(ts)
1212 .and_then(|dt| {
1213 let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1214 dt.naive_utc().try_into().ok();
1215 ct
1216 })
1217 .into();
1218 packer.push(d)
1219 }
1220 KafkaMetadataKind::Header { key, use_bytes } => {
1221 match msg.headers() {
1222 Some(headers) => {
1223 let d = headers
1224 .iter()
1225 .filter(|header| header.key == key)
1226 .last()
1227 .map(|header| match header.value {
1228 Some(v) => {
1229 if *use_bytes {
1230 Ok(Datum::Bytes(v))
1231 } else {
1232 match str::from_utf8(v) {
1233 Ok(str) => Ok(Datum::String(str)),
1234 Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1235 key: key.clone(),
1236 raw: v.to_vec(),
1237 }),
1238 }
1239 }
1240 }
1241 None => Ok(Datum::Null),
1242 })
1243 .unwrap_or_else(|| {
1244 Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1245 });
1246 match d {
1247 Ok(d) => packer.push(d),
1248 Err(err) => return (Err(err), (pid, offset.into())),
1250 }
1251 }
1252 None => packer.push(Datum::Null),
1253 }
1254 }
1255 KafkaMetadataKind::Headers => {
1256 packer.push_list_with(|r| {
1257 if let Some(headers) = msg.headers() {
1258 for header in headers.iter() {
1259 match header.value {
1260 Some(v) => r.push_list_with(|record_row| {
1261 record_row.push(Datum::String(header.key));
1262 record_row.push(Datum::Bytes(v));
1263 }),
1264 None => r.push_list_with(|record_row| {
1265 record_row.push(Datum::String(header.key));
1266 record_row.push(Datum::Null);
1267 }),
1268 }
1269 }
1270 }
1271 });
1272 }
1273 }
1274 }
1275
1276 let key = match msg.key() {
1277 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1278 None => Row::pack([Datum::Null]),
1279 };
1280 let value = match msg.payload() {
1281 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1282 None => Row::pack([Datum::Null]),
1283 };
1284 (
1285 Ok(SourceMessage {
1286 key,
1287 value,
1288 metadata,
1289 }),
1290 (pid, offset.into()),
1291 )
1292}
1293
1294struct PartitionConsumer {
1296 pid: PartitionId,
1298 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1300}
1301
1302impl PartitionConsumer {
1303 fn new(
1305 pid: PartitionId,
1306 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1307 ) -> Self {
1308 PartitionConsumer {
1309 pid,
1310 partition_queue,
1311 }
1312 }
1313
1314 fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1321 match self.partition_queue.poll(Duration::from_millis(0)) {
1322 Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1323 Some(Err(err)) => Err(err),
1324 _ => Ok(None),
1325 }
1326 }
1327
1328 fn pid(&self) -> PartitionId {
1330 self.pid
1331 }
1332}
1333
1334struct GlueConsumerContext {
1337 notificator: Arc<Notify>,
1338 stats_tx: crossbeam_channel::Sender<Jsonb>,
1339 inner: MzClientContext,
1340}
1341
1342impl ClientContext for GlueConsumerContext {
1343 fn stats_raw(&self, statistics: &[u8]) {
1344 match Jsonb::from_slice(statistics) {
1345 Ok(statistics) => {
1346 self.stats_tx
1347 .send(statistics)
1348 .expect("timely operator hung up while Kafka source active");
1349 self.activate();
1350 }
1351 Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1352 };
1353 }
1354
1355 fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1358 self.inner.log(level, fac, log_message)
1359 }
1360 fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1361 self.inner.error(error, reason)
1362 }
1363}
1364
1365impl GlueConsumerContext {
1366 fn activate(&self) {
1367 self.notificator.notify_one();
1368 }
1369}
1370
1371impl ConsumerContext for GlueConsumerContext {}
1372
1373#[cfg(test)]
1374mod tests {
1375 use std::sync::Arc;
1376 use std::time::Duration;
1377
1378 use mz_kafka_util::client::create_new_client_config_simple;
1379 use rdkafka::consumer::{BaseConsumer, Consumer};
1380 use rdkafka::{Message, Offset, TopicPartitionList};
1381 use uuid::Uuid;
1382
1383 #[mz_ore::test]
1395 #[ignore]
1396 fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1397 let topic_name = "queue-test";
1398 let pid = 0;
1399
1400 let mut kafka_config = create_new_client_config_simple();
1401 kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1402 kafka_config.set("enable.auto.commit", "false");
1403 kafka_config.set("group.id", Uuid::new_v4().to_string());
1404 kafka_config.set("fetch.message.max.bytes", "100");
1405 let consumer: BaseConsumer<_> = kafka_config.create()?;
1406
1407 let consumer = Arc::new(consumer);
1408
1409 let mut partition_list = TopicPartitionList::new();
1410 partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1413
1414 consumer.assign(&partition_list)?;
1415
1416 let partition_queue = consumer
1417 .split_partition_queue(topic_name, pid)
1418 .expect("missing partition queue");
1419
1420 let expected_messages = 1_000;
1421
1422 let mut common_queue_count = 0;
1423 let mut partition_queue_count = 0;
1424
1425 loop {
1426 if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1427 match msg {
1428 Ok(msg) => {
1429 let _payload =
1430 std::str::from_utf8(msg.payload().expect("missing payload"))?;
1431 if partition_queue_count > 0 {
1432 anyhow::bail!(
1433 "Got message from common queue after we internally switched to partition queue."
1434 );
1435 }
1436
1437 common_queue_count += 1;
1438 }
1439 Err(err) => anyhow::bail!("{}", err),
1440 }
1441 }
1442
1443 match partition_queue.poll(Duration::from_millis(0)) {
1444 Some(Ok(msg)) => {
1445 let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1446 partition_queue_count += 1;
1447 }
1448 Some(Err(err)) => anyhow::bail!("{}", err),
1449 _ => (),
1450 }
1451
1452 if (common_queue_count + partition_queue_count) == expected_messages {
1453 break;
1454 }
1455 }
1456
1457 assert!(
1458 common_queue_count == 0,
1459 "Got {} out of {} messages from common queue. Partition queue: {}",
1460 common_queue_count,
1461 expected_messages,
1462 partition_queue_count
1463 );
1464
1465 Ok(())
1466 }
1467}
1468
1469fn fetch_partition_info<C: ConsumerContext>(
1471 consumer: &BaseConsumer<C>,
1472 topic: &str,
1473 fetch_timeout: Duration,
1474) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1475 let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1476
1477 let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1478 for pid in pids {
1479 offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1480 }
1481
1482 let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1483
1484 let mut result = BTreeMap::new();
1485 for entry in offset_responses.elements() {
1486 let offset = match entry.offset() {
1487 Offset::Offset(offset) => offset,
1488 offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1489 };
1490
1491 let pid = entry.partition();
1492 let watermark = offset.try_into().expect("invalid negative offset");
1493 result.insert(pid, watermark);
1494 }
1495
1496 Ok(result)
1497}
1498
1499#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1501enum MetadataUpdate {
1502 Partitions(BTreeMap<PartitionId, HighWatermark>),
1504 TransientError(HealthStatus),
1508 DefiniteError(SourceError),
1512}
1513
1514impl MetadataUpdate {
1515 fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1517 match self {
1518 Self::Partitions(partitions) => {
1519 let max_pid = partitions.keys().last().copied();
1520 let lower = max_pid
1521 .map(RangeBound::after)
1522 .unwrap_or(RangeBound::NegInfinity);
1523 let future_ts =
1524 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1525
1526 let mut frontier = Antichain::from_elem(future_ts);
1527 for (pid, high_watermark) in partitions {
1528 frontier.insert(Partitioned::new_singleton(
1529 RangeBound::exact(*pid),
1530 MzOffset::from(*high_watermark),
1531 ));
1532 }
1533
1534 Some(frontier)
1535 }
1536 Self::DefiniteError(_) => Some(Antichain::new()),
1537 Self::TransientError(_) => None,
1538 }
1539 }
1540}
1541
1542#[derive(Debug, thiserror::Error)]
1543pub enum KafkaHeaderParseError {
1544 #[error("A header with key '{key}' was not found in the message headers")]
1545 KeyNotFound { key: String },
1546 #[error(
1547 "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1548 )]
1549 Utf8Error { key: String, raw: Vec<u8> },
1550}
1551
1552fn render_metadata_fetcher<'scope>(
1558 scope: Scope<'scope, KafkaTimestamp>,
1559 connection: KafkaSourceConnection,
1560 config: RawSourceCreationConfig,
1561) -> (
1562 StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
1563 StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
1564 PressOnDropButton,
1565) {
1566 let active_worker_id = usize::cast_from(config.id.hashed());
1567 let is_active_worker = active_worker_id % scope.peers() == scope.index();
1568
1569 let resume_upper = Antichain::from_iter(
1570 config
1571 .source_resume_uppers
1572 .values()
1573 .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1574 .flatten(),
1575 );
1576
1577 let name = format!("KafkaMetadataFetcher({})", config.id);
1578 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1579
1580 let (metadata_output, metadata_stream) =
1581 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1582 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1583
1584 let button = builder.build(move |caps| async move {
1585 if !is_active_worker {
1586 return;
1587 }
1588
1589 let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1590
1591 let client_id = connection.client_id(
1592 config.config.config_set(),
1593 &config.config.connection_context,
1594 config.id,
1595 );
1596 let KafkaSourceConnection {
1597 connection,
1598 topic,
1599 topic_metadata_refresh_interval,
1600 ..
1601 } = connection;
1602
1603 let consumer: Result<BaseConsumer<_>, _> = connection
1604 .create_with_context(
1605 &config.config,
1606 MzClientContext::default(),
1607 &btreemap! {
1608 "topic.metadata.refresh.interval.ms" =>
1611 topic_metadata_refresh_interval
1612 .as_millis()
1613 .to_string(),
1614 "client.id" => format!("{client_id}-metadata"),
1617 },
1618 InTask::Yes,
1619 )
1620 .await;
1621
1622 let consumer = match consumer {
1623 Ok(consumer) => consumer,
1624 Err(e) => {
1625 let msg = format!(
1626 "failed creating kafka metadata consumer: {}",
1627 e.display_with_causes()
1628 );
1629 let status_update = HealthStatusUpdate::halting(msg, None);
1630 let status = match e {
1631 ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1632 _ => HealthStatus::kafka(status_update),
1633 };
1634 let error = MetadataUpdate::TransientError(status);
1635 let timestamp = (config.now_fn)().into();
1636 metadata_output.give(&metadata_cap, (timestamp, error));
1637
1638 std::future::pending::<()>().await;
1642 unreachable!("pending future never returns");
1643 }
1644 };
1645
1646 let (tx, mut rx) = mpsc::unbounded_channel();
1647 spawn_metadata_thread(config, consumer, topic, tx);
1648
1649 let mut prev_upstream_frontier = resume_upper;
1650
1651 while let Some((timestamp, mut update)) = rx.recv().await {
1652 if prev_upstream_frontier.is_empty() {
1653 return;
1654 }
1655
1656 if let Some(upstream_frontier) = update.upstream_frontier() {
1657 if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1667 let error = SourceError {
1668 error: SourceErrorDetails::Other("topic was recreated".into()),
1669 };
1670 update = MetadataUpdate::DefiniteError(error);
1671 }
1672 }
1673
1674 if let Some(upstream_frontier) = update.upstream_frontier() {
1675 prev_upstream_frontier = upstream_frontier.clone();
1676
1677 let probe = Probe {
1678 probe_ts: timestamp,
1679 upstream_frontier,
1680 };
1681 probe_output.give(&probe_cap, probe);
1682 }
1683
1684 metadata_output.give(&metadata_cap, (timestamp, update));
1685 }
1686 });
1687
1688 (metadata_stream, probe_stream, button.press_on_drop())
1689}
1690
1691fn spawn_metadata_thread<C: ConsumerContext>(
1692 config: RawSourceCreationConfig,
1693 consumer: BaseConsumer<TunnelingClientContext<C>>,
1694 topic: String,
1695 tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1696) {
1697 thread::Builder::new()
1699 .name(format!("kfk-mtdt-{}", config.id))
1700 .spawn(move || {
1701 trace!(
1702 source_id = config.id.to_string(),
1703 worker_id = config.worker_id,
1704 num_workers = config.worker_count,
1705 "kafka metadata thread: starting..."
1706 );
1707
1708 let timestamp_interval = config.timestamp_interval;
1709 let mut ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
1710
1711 loop {
1712 let probe_ts = ticker.tick_blocking();
1713 let result = fetch_partition_info(
1714 &consumer,
1715 &topic,
1716 config
1717 .config
1718 .parameters
1719 .kafka_timeout_config
1720 .fetch_metadata_timeout,
1721 );
1722 trace!(
1723 source_id = config.id.to_string(),
1724 worker_id = config.worker_id,
1725 num_workers = config.worker_count,
1726 "kafka metadata thread: metadata fetch result: {:?}",
1727 result
1728 );
1729 let update = match result {
1730 Ok(partitions) => {
1731 trace!(
1732 source_id = config.id.to_string(),
1733 worker_id = config.worker_id,
1734 num_workers = config.worker_count,
1735 "kafka metadata thread: fetched partition metadata info",
1736 );
1737
1738 MetadataUpdate::Partitions(partitions)
1739 }
1740 Err(GetPartitionsError::TopicDoesNotExist) => {
1741 let error = SourceError {
1742 error: SourceErrorDetails::Other("topic was deleted".into()),
1743 };
1744 MetadataUpdate::DefiniteError(error)
1745 }
1746 Err(e) => {
1747 let kafka_status = Some(HealthStatusUpdate::stalled(
1748 format!("{}", e.display_with_causes()),
1749 None,
1750 ));
1751
1752 let ssh_status = consumer.client().context().tunnel_status();
1753 let ssh_status = match ssh_status {
1754 SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1755 SshTunnelStatus::Errored(e) => {
1756 Some(HealthStatusUpdate::stalled(e, None))
1757 }
1758 };
1759
1760 MetadataUpdate::TransientError(HealthStatus {
1761 kafka: kafka_status,
1762 ssh: ssh_status,
1763 })
1764 }
1765 };
1766
1767 if tx.send((probe_ts, update)).is_err() {
1768 break;
1769 }
1770 }
1771
1772 info!(
1773 source_id = config.id.to_string(),
1774 worker_id = config.worker_id,
1775 num_workers = config.worker_count,
1776 "kafka metadata thread: receiver has gone away; shutting down."
1777 )
1778 })
1779 .unwrap();
1780}