1use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::convert::Infallible;
13use std::str::{self};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use chrono::{DateTime, NaiveDateTime};
20use differential_dataflow::{AsCollection, Hashable};
21use futures::StreamExt;
22use itertools::Itertools;
23use maplit::btreemap;
24use mz_kafka_util::client::{
25 GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
26};
27use mz_ore::assert_none;
28use mz_ore::cast::CastFrom;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
34use mz_ssh_util::tunnel::SshTunnelStatus;
35use mz_storage_types::dyncfgs::KAFKA_METADATA_FETCH_INTERVAL;
36use mz_storage_types::errors::{
37 ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
38};
39use mz_storage_types::sources::kafka::{
40 KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
41};
42use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
43use mz_timely_util::antichain::AntichainExt;
44use mz_timely_util::builder_async::{
45 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::containers::stack::AccountedStackBuilder;
48use mz_timely_util::order::Partitioned;
49use rdkafka::consumer::base_consumer::PartitionQueue;
50use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
51use rdkafka::error::KafkaError;
52use rdkafka::message::{BorrowedMessage, Headers};
53use rdkafka::statistics::Statistics;
54use rdkafka::topic_partition_list::Offset;
55use rdkafka::{ClientContext, Message, TopicPartitionList};
56use serde::{Deserialize, Serialize};
57use timely::PartialOrder;
58use timely::container::CapacityContainerBuilder;
59use timely::dataflow::channels::pact::Pipeline;
60use timely::dataflow::operators::core::Partition;
61use timely::dataflow::operators::{Broadcast, Capability};
62use timely::dataflow::{Scope, Stream};
63use timely::progress::Antichain;
64use timely::progress::Timestamp;
65use tokio::sync::{Notify, mpsc};
66use tracing::{error, info, trace};
67
68use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
69use crate::metrics::source::kafka::KafkaSourceMetrics;
70use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
71use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
72use crate::statistics::SourceStatistics;
73
74#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
75struct HealthStatus {
76 kafka: Option<HealthStatusUpdate>,
77 ssh: Option<HealthStatusUpdate>,
78}
79
80impl HealthStatus {
81 fn kafka(update: HealthStatusUpdate) -> Self {
82 Self {
83 kafka: Some(update),
84 ssh: None,
85 }
86 }
87
88 fn ssh(update: HealthStatusUpdate) -> Self {
89 Self {
90 kafka: None,
91 ssh: Some(update),
92 }
93 }
94}
95
96pub struct KafkaSourceReader {
98 topic_name: String,
100 source_name: String,
102 id: GlobalId,
104 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
106 partition_consumers: Vec<PartitionConsumer>,
108 worker_id: usize,
110 worker_count: usize,
112 last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
116 start_offsets: BTreeMap<PartitionId, i64>,
118 stats_rx: crossbeam_channel::Receiver<Jsonb>,
120 partition_metrics: KafkaSourceMetrics,
122 partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
124}
125
126struct PartitionCapability {
127 data: Capability<KafkaTimestamp>,
129 progress: Capability<KafkaTimestamp>,
131}
132
133type HighWatermark = u64;
137
138pub struct KafkaResumeUpperProcessor {
141 config: RawSourceCreationConfig,
142 topic_name: String,
143 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
144 statistics: Vec<SourceStatistics>,
145}
146
147fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
151 let pid = usize::try_from(pid).expect("positive pid");
152 ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
153}
154
155struct SourceOutputInfo {
156 id: GlobalId,
157 output_index: usize,
158 resume_upper: Antichain<KafkaTimestamp>,
159 metadata_columns: Vec<KafkaMetadataKind>,
160}
161
162impl SourceRender for KafkaSourceConnection {
163 type Time = KafkaTimestamp;
168
169 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
170
171 fn render<G: Scope<Timestamp = KafkaTimestamp>>(
172 self,
173 scope: &mut G,
174 config: &RawSourceCreationConfig,
175 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
176 start_signal: impl std::future::Future<Output = ()> + 'static,
177 ) -> (
178 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
179 Stream<G, Infallible>,
180 Stream<G, HealthStatusMessage>,
181 Option<Stream<G, Probe<KafkaTimestamp>>>,
182 Vec<PressOnDropButton>,
183 ) {
184 let (metadata, probes, metadata_token) =
185 render_metadata_fetcher(scope, self.clone(), config.clone());
186 let (data, progress, health, reader_token) = render_reader(
187 scope,
188 self,
189 config.clone(),
190 resume_uppers,
191 metadata,
192 start_signal,
193 );
194
195 let partition_count = u64::cast_from(config.source_exports.len());
196 let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
197 partition_count,
198 |((output, data), time, diff): &(
199 (usize, Result<SourceMessage, DataflowError>),
200 _,
201 Diff,
202 )| {
203 let output = u64::cast_from(*output);
204 (output, (data.clone(), time.clone(), diff.clone()))
205 },
206 );
207 let mut data_collections = BTreeMap::new();
208 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
209 data_collections.insert(*id, data_stream.as_collection());
210 }
211
212 (
213 data_collections,
214 progress,
215 health,
216 Some(probes),
217 vec![metadata_token, reader_token],
218 )
219 }
220}
221
222fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
227 scope: &G,
228 connection: KafkaSourceConnection,
229 config: RawSourceCreationConfig,
230 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
231 metadata_stream: Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
232 start_signal: impl std::future::Future<Output = ()> + 'static,
233) -> (
234 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
235 Stream<G, Infallible>,
236 Stream<G, HealthStatusMessage>,
237 PressOnDropButton,
238) {
239 let name = format!("KafkaReader({})", config.id);
240 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
241
242 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
243 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
244 let (health_output, health_stream) = builder.new_output();
245
246 let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline);
247
248 let mut outputs = vec![];
249
250 let mut all_export_stats = vec![];
252 let mut snapshot_export_stats = vec![];
253 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
254 let SourceExport {
255 details,
256 storage_metadata: _,
257 data_config: _,
258 } = export;
259 let resume_upper = Antichain::from_iter(
260 config
261 .source_resume_uppers
262 .get(id)
263 .expect("all source exports must be present in source resume uppers")
264 .iter()
265 .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
266 );
267
268 let metadata_columns = match details {
269 SourceExportDetails::Kafka(details) => details
270 .metadata_columns
271 .iter()
272 .map(|(_name, kind)| kind.clone())
273 .collect::<Vec<_>>(),
274 _ => panic!("unexpected source export details: {:?}", details),
275 };
276
277 let statistics = config
278 .statistics
279 .get(id)
280 .expect("statistics have been initialized")
281 .clone();
282 if resume_upper.as_ref() == &[Partitioned::minimum()] {
284 snapshot_export_stats.push(statistics.clone());
285 }
286 all_export_stats.push(statistics);
287
288 let output = SourceOutputInfo {
289 id: *id,
290 resume_upper,
291 output_index: idx,
292 metadata_columns,
293 };
294 outputs.push(output);
295 }
296
297 let busy_signal = Arc::clone(&config.busy_signal);
298 let button = builder.build(move |caps| {
299 SignaledFuture::new(busy_signal, async move {
300 let [mut data_cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
301
302 let client_id = connection.client_id(
303 config.config.config_set(),
304 &config.config.connection_context,
305 config.id,
306 );
307 let group_id = connection.group_id(&config.config.connection_context, config.id);
308 let KafkaSourceConnection {
309 connection,
310 topic,
311 topic_metadata_refresh_interval,
312 start_offsets,
313 metadata_columns: _,
314 connection_id: _, group_id_prefix: _, } = connection;
319
320 let mut start_offsets: BTreeMap<_, i64> = start_offsets
322 .clone()
323 .into_iter()
324 .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
325 .map(|(k, v)| (k, v))
326 .collect();
327
328 let mut partition_capabilities = BTreeMap::new();
329 let mut max_pid = None;
330 let resume_upper = Antichain::from_iter(
331 outputs
332 .iter()
333 .map(|output| output.resume_upper.clone())
334 .flatten(),
335 );
336
337 for ts in resume_upper.elements() {
338 if let Some(pid) = ts.interval().singleton() {
339 let pid = pid.unwrap_exact();
340 max_pid = std::cmp::max(max_pid, Some(*pid));
341 if responsible_for_pid(&config, *pid) {
342 let restored_offset = i64::try_from(ts.timestamp().offset)
343 .expect("restored kafka offsets must fit into i64");
344 if let Some(start_offset) = start_offsets.get_mut(pid) {
345 *start_offset = std::cmp::max(restored_offset, *start_offset);
346 } else {
347 start_offsets.insert(*pid, restored_offset);
348 }
349
350 let part_ts = Partitioned::new_singleton(
351 RangeBound::exact(*pid),
352 ts.timestamp().clone(),
353 );
354 let part_cap = PartitionCapability {
355 data: data_cap.delayed(&part_ts),
356 progress: progress_cap.delayed(&part_ts),
357 };
358 partition_capabilities.insert(*pid, part_cap);
359 }
360 }
361 }
362 let lower = max_pid
363 .map(RangeBound::after)
364 .unwrap_or(RangeBound::NegInfinity);
365 let future_ts =
366 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
367 data_cap.downgrade(&future_ts);
368 progress_cap.downgrade(&future_ts);
369
370 info!(
371 source_id = config.id.to_string(),
372 worker_id = config.worker_id,
373 num_workers = config.worker_count,
374 "instantiating Kafka source reader at offsets {start_offsets:?}"
375 );
376
377 let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
378 let notificator = Arc::new(Notify::new());
379
380 let consumer: Result<BaseConsumer<_>, _> = connection
381 .create_with_context(
382 &config.config,
383 GlueConsumerContext {
384 notificator: Arc::clone(¬ificator),
385 stats_tx,
386 inner: MzClientContext::default(),
387 },
388 &btreemap! {
389 "enable.auto.commit" => "false".into(),
394 "auto.offset.reset" => "earliest".into(),
397 "topic.metadata.refresh.interval.ms" =>
400 topic_metadata_refresh_interval
401 .as_millis()
402 .to_string(),
403 "fetch.message.max.bytes" => "134217728".into(),
405 "group.id" => group_id.clone(),
411 "client.id" => client_id.clone(),
414 },
415 InTask::Yes,
416 )
417 .await;
418
419 let consumer = match consumer {
420 Ok(consumer) => Arc::new(consumer),
421 Err(e) => {
422 let update = HealthStatusUpdate::halting(
423 format!(
424 "failed creating kafka reader consumer: {}",
425 e.display_with_causes()
426 ),
427 None,
428 );
429 health_output.give(
430 &health_cap,
431 HealthStatusMessage {
432 id: None,
433 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
434 StatusNamespace::Ssh
435 } else {
436 StatusNamespace::Kafka
437 },
438 update: update.clone(),
439 },
440 );
441 for (output, update) in outputs.iter().repeat_clone(update) {
442 health_output.give(
443 &health_cap,
444 HealthStatusMessage {
445 id: Some(output.id),
446 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
447 StatusNamespace::Ssh
448 } else {
449 StatusNamespace::Kafka
450 },
451 update,
452 },
453 );
454 }
455 std::future::pending::<()>().await;
459 unreachable!("pending future never returns");
460 }
461 };
462
463 start_signal.await;
467 info!(
468 source_id = config.id.to_string(),
469 worker_id = config.worker_id,
470 num_workers = config.worker_count,
471 "kafka worker noticed rehydration is finished, starting partition queues..."
472 );
473
474 let partition_ids = start_offsets.keys().copied().collect();
475 let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
476
477 let mut reader = KafkaSourceReader {
478 topic_name: topic.clone(),
479 source_name: config.name.clone(),
480 id: config.id,
481 partition_consumers: Vec::new(),
482 consumer: Arc::clone(&consumer),
483 worker_id: config.worker_id,
484 worker_count: config.worker_count,
485 last_offsets: outputs
486 .iter()
487 .map(|output| (output.output_index, BTreeMap::new()))
488 .collect(),
489 start_offsets,
490 stats_rx,
491 partition_metrics: config.metrics.get_kafka_source_metrics(
492 partition_ids,
493 topic.clone(),
494 config.id,
495 ),
496 partition_capabilities,
497 };
498
499 let offset_committer = KafkaResumeUpperProcessor {
500 config: config.clone(),
501 topic_name: topic.clone(),
502 consumer,
503 statistics: all_export_stats.clone(),
504 };
505
506 if !snapshot_export_stats.is_empty() {
508 if let Err(e) = offset_committer
509 .process_frontier(resume_upper.clone())
510 .await
511 {
512 offset_commit_metrics.offset_commit_failures.inc();
513 tracing::warn!(
514 %e,
515 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
516 worker_id = config.worker_id,
517 source_id = config.id,
518 upper = resume_upper.pretty()
519 );
520 }
521 for statistics in config.statistics.values() {
525 statistics.set_snapshot_records_known(0);
526 statistics.set_snapshot_records_staged(0);
527 }
528 }
529
530 let resume_uppers_process_loop = async move {
531 tokio::pin!(resume_uppers);
532 while let Some(frontier) = resume_uppers.next().await {
533 if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
534 offset_commit_metrics.offset_commit_failures.inc();
535 tracing::warn!(
536 %e,
537 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
538 worker_id = config.worker_id,
539 source_id = config.id,
540 upper = frontier.pretty()
541 );
542 }
543 }
544 std::future::pending::<()>().await;
549 };
550 tokio::pin!(resume_uppers_process_loop);
551
552 let mut metadata_update: Option<MetadataUpdate> = None;
553 let mut snapshot_total = None;
554
555 let max_wait_time =
556 mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
557 loop {
558 tokio::select! {
561 _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
564
565 _ = metadata_input.ready() => {
566 let mut updates = Vec::new();
568 while let Some(event) = metadata_input.next_sync() {
569 if let Event::Data(_, mut data) = event {
570 updates.append(&mut data);
571 }
572 }
573 metadata_update = updates
574 .into_iter()
575 .max_by_key(|(ts, _)| *ts)
576 .map(|(_, update)| update);
577 }
578
579 _ = resume_uppers_process_loop.as_mut() => {},
583 }
584
585 match metadata_update.take() {
586 Some(MetadataUpdate::Partitions(partitions)) => {
587 let max_pid = partitions.keys().last().cloned();
588 let lower = max_pid
589 .map(RangeBound::after)
590 .unwrap_or(RangeBound::NegInfinity);
591 let future_ts = Partitioned::new_range(
592 lower,
593 RangeBound::PosInfinity,
594 MzOffset::from(0),
595 );
596
597 let mut offset_known = 0;
598 for (&pid, &high_watermark) in &partitions {
599 if responsible_for_pid(&config, pid) {
600 offset_known += high_watermark;
601 reader.ensure_partition(pid);
602 if let Entry::Vacant(entry) =
603 reader.partition_capabilities.entry(pid)
604 {
605 let start_offset = match reader.start_offsets.get(&pid) {
606 Some(&offset) => offset.try_into().unwrap(),
607 None => 0u64,
608 };
609 let part_since_ts = Partitioned::new_singleton(
610 RangeBound::exact(pid),
611 MzOffset::from(start_offset),
612 );
613 let part_upper_ts = Partitioned::new_singleton(
614 RangeBound::exact(pid),
615 MzOffset::from(high_watermark),
616 );
617
618 entry.insert(PartitionCapability {
628 data: data_cap.delayed(&part_since_ts),
629 progress: progress_cap.delayed(&part_upper_ts),
630 });
631 }
632 }
633 }
634
635 if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
638 snapshot_total = Some(offset_known);
642 }
643
644 for output in &outputs {
648 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
649 health_output.give(
650 &health_cap,
651 HealthStatusMessage {
652 id: Some(output.id),
653 namespace,
654 update: HealthStatusUpdate::running(),
655 },
656 );
657 }
658 }
659 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
660 health_output.give(
661 &health_cap,
662 HealthStatusMessage {
663 id: None,
664 namespace,
665 update: HealthStatusUpdate::running(),
666 },
667 );
668 }
669
670 for export_stat in all_export_stats.iter() {
671 export_stat.set_offset_known(offset_known);
672 }
673
674 data_cap.downgrade(&future_ts);
675 progress_cap.downgrade(&future_ts);
676 }
677 Some(MetadataUpdate::TransientError(status)) => {
678 if let Some(update) = status.kafka {
679 health_output.give(
680 &health_cap,
681 HealthStatusMessage {
682 id: None,
683 namespace: StatusNamespace::Kafka,
684 update: update.clone(),
685 },
686 );
687 for (output, update) in outputs.iter().repeat_clone(update) {
688 health_output.give(
689 &health_cap,
690 HealthStatusMessage {
691 id: Some(output.id),
692 namespace: StatusNamespace::Kafka,
693 update,
694 },
695 );
696 }
697 }
698 if let Some(update) = status.ssh {
699 health_output.give(
700 &health_cap,
701 HealthStatusMessage {
702 id: None,
703 namespace: StatusNamespace::Ssh,
704 update: update.clone(),
705 },
706 );
707 for (output, update) in outputs.iter().repeat_clone(update) {
708 health_output.give(
709 &health_cap,
710 HealthStatusMessage {
711 id: Some(output.id),
712 namespace: StatusNamespace::Ssh,
713 update,
714 },
715 );
716 }
717 }
718 }
719 Some(MetadataUpdate::DefiniteError(error)) => {
720 health_output.give(
721 &health_cap,
722 HealthStatusMessage {
723 id: None,
724 namespace: StatusNamespace::Kafka,
725 update: HealthStatusUpdate::stalled(
726 error.to_string(),
727 None,
728 ),
729 },
730 );
731 let error = Err(error.into());
732 let time = data_cap.time().clone();
733 for (output, error) in
734 outputs.iter().map(|o| o.output_index).repeat_clone(error)
735 {
736 data_output
737 .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
738 .await;
739 }
740
741 return;
742 }
743 None => {}
744 }
745
746 while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
754 match result {
755 Err(e) => {
756 let error = format!(
757 "kafka error when polling consumer for source: {} topic: {} : {}",
758 reader.source_name, reader.topic_name, e
759 );
760 let status = HealthStatusUpdate::stalled(error, None);
761 health_output.give(
762 &health_cap,
763 HealthStatusMessage {
764 id: None,
765 namespace: StatusNamespace::Kafka,
766 update: status.clone(),
767 },
768 );
769 for (output, status) in outputs.iter().repeat_clone(status) {
770 health_output.give(
771 &health_cap,
772 HealthStatusMessage {
773 id: Some(output.id),
774 namespace: StatusNamespace::Kafka,
775 update: status,
776 },
777 );
778 }
779 }
780 Ok(message) => {
781 let output_messages = outputs
782 .iter()
783 .map(|output| {
784 let (message, ts) = construct_source_message(
785 &message,
786 &output.metadata_columns,
787 );
788 (output.output_index, message, ts)
789 })
790 .collect::<Vec<_>>();
794 for (output_index, message, ts) in output_messages {
795 if let Some((msg, time, diff)) =
796 reader.handle_message(message, ts, &output_index)
797 {
798 let pid = time.interval().singleton().unwrap().unwrap_exact();
799 let part_cap = &reader.partition_capabilities[pid].data;
800 let msg = msg.map_err(|e| {
801 DataflowError::SourceError(Box::new(SourceError {
802 error: SourceErrorDetails::Other(e.to_string().into()),
803 }))
804 });
805 data_output
806 .give_fueled(part_cap, ((output_index, msg), time, diff))
807 .await;
808 }
809 }
810 }
811 }
812 }
813
814 reader.update_stats();
815
816 let mut consumers = std::mem::take(&mut reader.partition_consumers);
818 for consumer in consumers.iter_mut() {
819 let pid = consumer.pid();
820 let mut partition_exhausted = false;
826 for _ in 0..10_000 {
827 let Some(message) = consumer.get_next_message().transpose() else {
828 partition_exhausted = true;
829 break;
830 };
831
832 for output in outputs.iter() {
833 let message = match &message {
834 Ok((msg, pid)) => {
835 let (msg, ts) =
836 construct_source_message(msg, &output.metadata_columns);
837 assert_eq!(*pid, ts.0);
838 Ok(reader.handle_message(msg, ts, &output.output_index))
839 }
840 Err(err) => Err(err),
841 };
842 match message {
843 Ok(Some((msg, time, diff))) => {
844 let pid = time.interval().singleton().unwrap().unwrap_exact();
845 let part_cap = &reader.partition_capabilities[pid].data;
846 let msg = msg.map_err(|e| {
847 DataflowError::SourceError(Box::new(SourceError {
848 error: SourceErrorDetails::Other(e.to_string().into()),
849 }))
850 });
851 data_output
852 .give_fueled(
853 part_cap,
854 ((output.output_index, msg), time, diff),
855 )
856 .await;
857 }
858 Ok(None) => continue,
860 Err(err) => {
861 let last_offset = reader
862 .last_offsets
863 .get(&output.output_index)
864 .expect("output known to be installed")
865 .get(&pid)
866 .expect("partition known to be installed");
867
868 let status = HealthStatusUpdate::stalled(
869 format!(
870 "error consuming from source: {} topic: {topic}:\
871 partition: {pid} last processed offset:\
872 {last_offset} : {err}",
873 config.name
874 ),
875 None,
876 );
877 health_output.give(
878 &health_cap,
879 HealthStatusMessage {
880 id: None,
881 namespace: StatusNamespace::Kafka,
882 update: status.clone(),
883 },
884 );
885 health_output.give(
886 &health_cap,
887 HealthStatusMessage {
888 id: Some(output.id),
889 namespace: StatusNamespace::Kafka,
890 update: status,
891 },
892 );
893 }
894 }
895 }
896 }
897 if !partition_exhausted {
898 notificator.notify_one();
899 }
900 }
901 assert!(reader.partition_consumers.is_empty());
903 reader.partition_consumers = consumers;
904
905 let positions = reader.consumer.position().unwrap();
906 let topic_positions = positions.elements_for_topic(&reader.topic_name);
907 let mut snapshot_staged = 0;
908
909 for position in topic_positions {
910 if let Offset::Offset(offset) = position.offset() {
913 let pid = position.partition();
914 let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
915 let upper =
916 Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
917
918 let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
919 match part_cap.data.try_downgrade(&upper) {
920 Ok(()) => {
921 if !snapshot_export_stats.is_empty() {
922 snapshot_staged += offset.try_into().unwrap_or(0u64);
925 if let Some(snapshot_total) = snapshot_total {
927 snapshot_staged =
930 std::cmp::min(snapshot_staged, snapshot_total);
931 }
932 }
933 }
934 Err(_) => {
935 info!(
938 source_id = config.id.to_string(),
939 worker_id = config.worker_id,
940 num_workers = config.worker_count,
941 "kafka source frontier downgrade skipped due to already \
942 seen offset: {:?}",
943 upper
944 );
945 }
946 };
947
948 let _ = part_cap.progress.try_downgrade(&upper);
953 }
954 }
955
956 if let (Some(snapshot_total), true) =
957 (snapshot_total, !snapshot_export_stats.is_empty())
958 {
959 for export_stat in snapshot_export_stats.iter() {
960 export_stat.set_snapshot_records_known(snapshot_total);
961 export_stat.set_snapshot_records_staged(snapshot_staged);
962 }
963 if snapshot_total == snapshot_staged {
964 snapshot_export_stats.clear();
965 }
966 }
967 }
968 })
969 });
970
971 (
972 stream.as_collection(),
973 progress_stream,
974 health_stream,
975 button.press_on_drop(),
976 )
977}
978
979impl KafkaResumeUpperProcessor {
980 async fn process_frontier(
981 &self,
982 frontier: Antichain<KafkaTimestamp>,
983 ) -> Result<(), anyhow::Error> {
984 use rdkafka::consumer::CommitMode;
985
986 let mut offsets = vec![];
988 let mut offset_committed = 0;
989 for ts in frontier.iter() {
990 if let Some(pid) = ts.interval().singleton() {
991 let pid = pid.unwrap_exact();
992 if responsible_for_pid(&self.config, *pid) {
993 offsets.push((pid.clone(), *ts.timestamp()));
994
995 offset_committed += ts.timestamp().offset;
1000 }
1001 }
1002 }
1003
1004 for export_stat in self.statistics.iter() {
1005 export_stat.set_offset_committed(offset_committed);
1006 }
1007
1008 if !offsets.is_empty() {
1009 let mut tpl = TopicPartitionList::new();
1010 for (pid, offset) in offsets {
1011 let offset_to_commit =
1012 Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
1013 tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
1014 .expect("offset known to be valid");
1015 }
1016 let consumer = Arc::clone(&self.consumer);
1017 mz_ore::task::spawn_blocking(
1018 || format!("source({}) kafka offset commit", self.config.id),
1019 move || consumer.commit(&tpl, CommitMode::Sync),
1020 )
1021 .await??;
1022 }
1023 Ok(())
1024 }
1025}
1026
1027impl KafkaSourceReader {
1028 fn ensure_partition(&mut self, pid: PartitionId) {
1030 if self.last_offsets.is_empty() {
1031 tracing::info!(
1032 source_id = %self.id,
1033 worker_id = %self.worker_id,
1034 "kafka source does not have any outputs, not creating partition queue");
1035
1036 return;
1037 }
1038 for last_offsets in self.last_offsets.values() {
1039 if last_offsets.contains_key(&pid) {
1041 return;
1042 }
1043 }
1044
1045 let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1046 self.create_partition_queue(pid, Offset::Offset(start_offset));
1047
1048 for last_offsets in self.last_offsets.values_mut() {
1049 let prev = last_offsets.insert(pid, start_offset - 1);
1050 assert_none!(prev);
1051 }
1052 }
1053
1054 fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1056 info!(
1057 source_id = self.id.to_string(),
1058 worker_id = self.worker_id,
1059 num_workers = self.worker_count,
1060 "activating Kafka queue for topic {}, partition {}",
1061 self.topic_name,
1062 partition_id,
1063 );
1064
1065 let tpl = self.consumer.assignment().unwrap();
1067 let mut partition_list = TopicPartitionList::new();
1069 for partition in tpl.elements_for_topic(&self.topic_name) {
1070 partition_list
1071 .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1072 .expect("offset known to be valid");
1073 }
1074 partition_list
1076 .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1077 .expect("offset known to be valid");
1078 self.consumer
1079 .assign(&partition_list)
1080 .expect("assignment known to be valid");
1081
1082 let context = Arc::clone(self.consumer.context());
1085 for pc in &mut self.partition_consumers {
1086 pc.partition_queue = self
1087 .consumer
1088 .split_partition_queue(&self.topic_name, pc.pid)
1089 .expect("partition known to be valid");
1090 pc.partition_queue.set_nonempty_callback({
1091 let context = Arc::clone(&context);
1092 move || context.inner().activate()
1093 });
1094 }
1095
1096 let mut partition_queue = self
1097 .consumer
1098 .split_partition_queue(&self.topic_name, partition_id)
1099 .expect("partition known to be valid");
1100 partition_queue.set_nonempty_callback(move || context.inner().activate());
1101 self.partition_consumers
1102 .push(PartitionConsumer::new(partition_id, partition_queue));
1103 assert_eq!(
1104 self.consumer
1105 .assignment()
1106 .unwrap()
1107 .elements_for_topic(&self.topic_name)
1108 .len(),
1109 self.partition_consumers.len()
1110 );
1111 }
1112
1113 fn update_stats(&mut self) {
1115 while let Ok(stats) = self.stats_rx.try_recv() {
1116 match serde_json::from_str::<Statistics>(&stats.to_string()) {
1117 Ok(statistics) => {
1118 let topic = statistics.topics.get(&self.topic_name);
1119 match topic {
1120 Some(topic) => {
1121 for (id, partition) in &topic.partitions {
1122 self.partition_metrics
1123 .set_offset_max(*id, partition.hi_offset);
1124 }
1125 }
1126 None => error!("No stats found for topic: {}", &self.topic_name),
1127 }
1128 }
1129 Err(e) => {
1130 error!("failed decoding librdkafka statistics JSON: {}", e);
1131 }
1132 }
1133 }
1134 }
1135
1136 fn handle_message(
1139 &mut self,
1140 message: Result<SourceMessage, KafkaHeaderParseError>,
1141 (partition, offset): (PartitionId, MzOffset),
1142 output_index: &usize,
1143 ) -> Option<(
1144 Result<SourceMessage, KafkaHeaderParseError>,
1145 KafkaTimestamp,
1146 Diff,
1147 )> {
1148 assert!(
1160 self.last_offsets
1161 .get(output_index)
1162 .unwrap()
1163 .contains_key(&partition)
1164 );
1165
1166 let last_offset_ref = self
1167 .last_offsets
1168 .get_mut(output_index)
1169 .expect("output known to be installed")
1170 .get_mut(&partition)
1171 .expect("partition known to be installed");
1172
1173 let last_offset = *last_offset_ref;
1174 let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1175 if offset_as_i64 <= last_offset {
1176 info!(
1177 source_id = self.id.to_string(),
1178 worker_id = self.worker_id,
1179 num_workers = self.worker_count,
1180 "kafka message before expected offset: \
1181 source {} (reading topic {}, partition {}, output {}) \
1182 received offset {} expected offset {:?}",
1183 self.source_name,
1184 self.topic_name,
1185 partition,
1186 output_index,
1187 offset.offset,
1188 last_offset + 1,
1189 );
1190 None
1192 } else {
1193 *last_offset_ref = offset_as_i64;
1194
1195 let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1196 Some((message, ts, Diff::ONE))
1197 }
1198 }
1199}
1200
1201fn construct_source_message(
1202 msg: &BorrowedMessage<'_>,
1203 metadata_columns: &[KafkaMetadataKind],
1204) -> (
1205 Result<SourceMessage, KafkaHeaderParseError>,
1206 (PartitionId, MzOffset),
1207) {
1208 let pid = msg.partition();
1209 let Ok(offset) = u64::try_from(msg.offset()) else {
1210 panic!(
1211 "got negative offset ({}) from otherwise non-error'd kafka message",
1212 msg.offset()
1213 );
1214 };
1215
1216 let mut metadata = Row::default();
1217 let mut packer = metadata.packer();
1218 for kind in metadata_columns {
1219 match kind {
1220 KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1221 KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1222 KafkaMetadataKind::Timestamp => {
1223 let ts = msg
1224 .timestamp()
1225 .to_millis()
1226 .expect("kafka sources always have upstream_time");
1227
1228 let d: Datum = DateTime::from_timestamp_millis(ts)
1229 .and_then(|dt| {
1230 let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1231 dt.naive_utc().try_into().ok();
1232 ct
1233 })
1234 .into();
1235 packer.push(d)
1236 }
1237 KafkaMetadataKind::Header { key, use_bytes } => {
1238 match msg.headers() {
1239 Some(headers) => {
1240 let d = headers
1241 .iter()
1242 .filter(|header| header.key == key)
1243 .last()
1244 .map(|header| match header.value {
1245 Some(v) => {
1246 if *use_bytes {
1247 Ok(Datum::Bytes(v))
1248 } else {
1249 match str::from_utf8(v) {
1250 Ok(str) => Ok(Datum::String(str)),
1251 Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1252 key: key.clone(),
1253 raw: v.to_vec(),
1254 }),
1255 }
1256 }
1257 }
1258 None => Ok(Datum::Null),
1259 })
1260 .unwrap_or_else(|| {
1261 Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1262 });
1263 match d {
1264 Ok(d) => packer.push(d),
1265 Err(err) => return (Err(err), (pid, offset.into())),
1267 }
1268 }
1269 None => packer.push(Datum::Null),
1270 }
1271 }
1272 KafkaMetadataKind::Headers => {
1273 packer.push_list_with(|r| {
1274 if let Some(headers) = msg.headers() {
1275 for header in headers.iter() {
1276 match header.value {
1277 Some(v) => r.push_list_with(|record_row| {
1278 record_row.push(Datum::String(header.key));
1279 record_row.push(Datum::Bytes(v));
1280 }),
1281 None => r.push_list_with(|record_row| {
1282 record_row.push(Datum::String(header.key));
1283 record_row.push(Datum::Null);
1284 }),
1285 }
1286 }
1287 }
1288 });
1289 }
1290 }
1291 }
1292
1293 let key = match msg.key() {
1294 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1295 None => Row::pack([Datum::Null]),
1296 };
1297 let value = match msg.payload() {
1298 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1299 None => Row::pack([Datum::Null]),
1300 };
1301 (
1302 Ok(SourceMessage {
1303 key,
1304 value,
1305 metadata,
1306 }),
1307 (pid, offset.into()),
1308 )
1309}
1310
1311struct PartitionConsumer {
1313 pid: PartitionId,
1315 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1317}
1318
1319impl PartitionConsumer {
1320 fn new(
1322 pid: PartitionId,
1323 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1324 ) -> Self {
1325 PartitionConsumer {
1326 pid,
1327 partition_queue,
1328 }
1329 }
1330
1331 fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1338 match self.partition_queue.poll(Duration::from_millis(0)) {
1339 Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1340 Some(Err(err)) => Err(err),
1341 _ => Ok(None),
1342 }
1343 }
1344
1345 fn pid(&self) -> PartitionId {
1347 self.pid
1348 }
1349}
1350
1351struct GlueConsumerContext {
1354 notificator: Arc<Notify>,
1355 stats_tx: crossbeam_channel::Sender<Jsonb>,
1356 inner: MzClientContext,
1357}
1358
1359impl ClientContext for GlueConsumerContext {
1360 fn stats_raw(&self, statistics: &[u8]) {
1361 match Jsonb::from_slice(statistics) {
1362 Ok(statistics) => {
1363 self.stats_tx
1364 .send(statistics)
1365 .expect("timely operator hung up while Kafka source active");
1366 self.activate();
1367 }
1368 Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1369 };
1370 }
1371
1372 fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1375 self.inner.log(level, fac, log_message)
1376 }
1377 fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1378 self.inner.error(error, reason)
1379 }
1380}
1381
1382impl GlueConsumerContext {
1383 fn activate(&self) {
1384 self.notificator.notify_one();
1385 }
1386}
1387
1388impl ConsumerContext for GlueConsumerContext {}
1389
1390#[cfg(test)]
1391mod tests {
1392 use std::sync::Arc;
1393 use std::time::Duration;
1394
1395 use mz_kafka_util::client::create_new_client_config_simple;
1396 use rdkafka::consumer::{BaseConsumer, Consumer};
1397 use rdkafka::{Message, Offset, TopicPartitionList};
1398 use uuid::Uuid;
1399
1400 #[mz_ore::test]
1412 #[ignore]
1413 fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1414 let topic_name = "queue-test";
1415 let pid = 0;
1416
1417 let mut kafka_config = create_new_client_config_simple();
1418 kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1419 kafka_config.set("enable.auto.commit", "false");
1420 kafka_config.set("group.id", Uuid::new_v4().to_string());
1421 kafka_config.set("fetch.message.max.bytes", "100");
1422 let consumer: BaseConsumer<_> = kafka_config.create()?;
1423
1424 let consumer = Arc::new(consumer);
1425
1426 let mut partition_list = TopicPartitionList::new();
1427 partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1430
1431 consumer.assign(&partition_list)?;
1432
1433 let partition_queue = consumer
1434 .split_partition_queue(topic_name, pid)
1435 .expect("missing partition queue");
1436
1437 let expected_messages = 1_000;
1438
1439 let mut common_queue_count = 0;
1440 let mut partition_queue_count = 0;
1441
1442 loop {
1443 if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1444 match msg {
1445 Ok(msg) => {
1446 let _payload =
1447 std::str::from_utf8(msg.payload().expect("missing payload"))?;
1448 if partition_queue_count > 0 {
1449 anyhow::bail!(
1450 "Got message from common queue after we internally switched to partition queue."
1451 );
1452 }
1453
1454 common_queue_count += 1;
1455 }
1456 Err(err) => anyhow::bail!("{}", err),
1457 }
1458 }
1459
1460 match partition_queue.poll(Duration::from_millis(0)) {
1461 Some(Ok(msg)) => {
1462 let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1463 partition_queue_count += 1;
1464 }
1465 Some(Err(err)) => anyhow::bail!("{}", err),
1466 _ => (),
1467 }
1468
1469 if (common_queue_count + partition_queue_count) == expected_messages {
1470 break;
1471 }
1472 }
1473
1474 assert!(
1475 common_queue_count == 0,
1476 "Got {} out of {} messages from common queue. Partition queue: {}",
1477 common_queue_count,
1478 expected_messages,
1479 partition_queue_count
1480 );
1481
1482 Ok(())
1483 }
1484}
1485
1486fn fetch_partition_info<C: ConsumerContext>(
1488 consumer: &BaseConsumer<C>,
1489 topic: &str,
1490 fetch_timeout: Duration,
1491) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1492 let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1493
1494 let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1495 for pid in pids {
1496 offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1497 }
1498
1499 let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1500
1501 let mut result = BTreeMap::new();
1502 for entry in offset_responses.elements() {
1503 let offset = match entry.offset() {
1504 Offset::Offset(offset) => offset,
1505 offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1506 };
1507
1508 let pid = entry.partition();
1509 let watermark = offset.try_into().expect("invalid negative offset");
1510 result.insert(pid, watermark);
1511 }
1512
1513 Ok(result)
1514}
1515
1516#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1518enum MetadataUpdate {
1519 Partitions(BTreeMap<PartitionId, HighWatermark>),
1521 TransientError(HealthStatus),
1525 DefiniteError(SourceError),
1529}
1530
1531impl MetadataUpdate {
1532 fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1534 match self {
1535 Self::Partitions(partitions) => {
1536 let max_pid = partitions.keys().last().copied();
1537 let lower = max_pid
1538 .map(RangeBound::after)
1539 .unwrap_or(RangeBound::NegInfinity);
1540 let future_ts =
1541 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1542
1543 let mut frontier = Antichain::from_elem(future_ts);
1544 for (pid, high_watermark) in partitions {
1545 frontier.insert(Partitioned::new_singleton(
1546 RangeBound::exact(*pid),
1547 MzOffset::from(*high_watermark),
1548 ));
1549 }
1550
1551 Some(frontier)
1552 }
1553 Self::DefiniteError(_) => Some(Antichain::new()),
1554 Self::TransientError(_) => None,
1555 }
1556 }
1557}
1558
1559#[derive(Debug, thiserror::Error)]
1560pub enum KafkaHeaderParseError {
1561 #[error("A header with key '{key}' was not found in the message headers")]
1562 KeyNotFound { key: String },
1563 #[error(
1564 "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1565 )]
1566 Utf8Error { key: String, raw: Vec<u8> },
1567}
1568
1569fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
1575 scope: &G,
1576 connection: KafkaSourceConnection,
1577 config: RawSourceCreationConfig,
1578) -> (
1579 Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
1580 Stream<G, Probe<KafkaTimestamp>>,
1581 PressOnDropButton,
1582) {
1583 let active_worker_id = usize::cast_from(config.id.hashed());
1584 let is_active_worker = active_worker_id % scope.peers() == scope.index();
1585
1586 let resume_upper = Antichain::from_iter(
1587 config
1588 .source_resume_uppers
1589 .values()
1590 .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1591 .flatten(),
1592 );
1593
1594 let name = format!("KafkaMetadataFetcher({})", config.id);
1595 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1596
1597 let (metadata_output, metadata_stream) = builder.new_output();
1598 let (probe_output, probe_stream) = builder.new_output();
1599
1600 let button = builder.build(move |caps| async move {
1601 if !is_active_worker {
1602 return;
1603 }
1604
1605 let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1606
1607 let client_id = connection.client_id(
1608 config.config.config_set(),
1609 &config.config.connection_context,
1610 config.id,
1611 );
1612 let KafkaSourceConnection {
1613 connection,
1614 topic,
1615 topic_metadata_refresh_interval,
1616 ..
1617 } = connection;
1618
1619 let consumer: Result<BaseConsumer<_>, _> = connection
1620 .create_with_context(
1621 &config.config,
1622 MzClientContext::default(),
1623 &btreemap! {
1624 "topic.metadata.refresh.interval.ms" =>
1627 topic_metadata_refresh_interval
1628 .as_millis()
1629 .to_string(),
1630 "client.id" => format!("{client_id}-metadata"),
1633 },
1634 InTask::Yes,
1635 )
1636 .await;
1637
1638 let consumer = match consumer {
1639 Ok(consumer) => consumer,
1640 Err(e) => {
1641 let msg = format!(
1642 "failed creating kafka metadata consumer: {}",
1643 e.display_with_causes()
1644 );
1645 let status_update = HealthStatusUpdate::halting(msg, None);
1646 let status = match e {
1647 ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1648 _ => HealthStatus::kafka(status_update),
1649 };
1650 let error = MetadataUpdate::TransientError(status);
1651 let timestamp = (config.now_fn)().into();
1652 metadata_output.give(&metadata_cap, (timestamp, error));
1653
1654 std::future::pending::<()>().await;
1658 unreachable!("pending future never returns");
1659 }
1660 };
1661
1662 let (tx, mut rx) = mpsc::unbounded_channel();
1663 spawn_metadata_thread(config, consumer, topic, tx);
1664
1665 let mut prev_upstream_frontier = resume_upper;
1666
1667 while let Some((timestamp, mut update)) = rx.recv().await {
1668 if prev_upstream_frontier.is_empty() {
1669 return;
1670 }
1671
1672 if let Some(upstream_frontier) = update.upstream_frontier() {
1673 if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1683 let error = SourceError {
1684 error: SourceErrorDetails::Other("topic was recreated".into()),
1685 };
1686 update = MetadataUpdate::DefiniteError(error);
1687 }
1688 }
1689
1690 if let Some(upstream_frontier) = update.upstream_frontier() {
1691 prev_upstream_frontier = upstream_frontier.clone();
1692
1693 let probe = Probe {
1694 probe_ts: timestamp,
1695 upstream_frontier,
1696 };
1697 probe_output.give(&probe_cap, probe);
1698 }
1699
1700 metadata_output.give(&metadata_cap, (timestamp, update));
1701 }
1702 });
1703
1704 (metadata_stream, probe_stream, button.press_on_drop())
1705}
1706
1707fn spawn_metadata_thread<C: ConsumerContext>(
1708 config: RawSourceCreationConfig,
1709 consumer: BaseConsumer<TunnelingClientContext<C>>,
1710 topic: String,
1711 tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1712) {
1713 thread::Builder::new()
1715 .name(format!("kfk-mtdt-{}", config.id))
1716 .spawn(move || {
1717 trace!(
1718 source_id = config.id.to_string(),
1719 worker_id = config.worker_id,
1720 num_workers = config.worker_count,
1721 "kafka metadata thread: starting..."
1722 );
1723
1724 let mut ticker = probe::Ticker::new(
1725 || KAFKA_METADATA_FETCH_INTERVAL.get(config.config.config_set()),
1726 config.now_fn,
1727 );
1728
1729 loop {
1730 let probe_ts = ticker.tick_blocking();
1731 let result = fetch_partition_info(
1732 &consumer,
1733 &topic,
1734 config
1735 .config
1736 .parameters
1737 .kafka_timeout_config
1738 .fetch_metadata_timeout,
1739 );
1740 trace!(
1741 source_id = config.id.to_string(),
1742 worker_id = config.worker_id,
1743 num_workers = config.worker_count,
1744 "kafka metadata thread: metadata fetch result: {:?}",
1745 result
1746 );
1747 let update = match result {
1748 Ok(partitions) => {
1749 trace!(
1750 source_id = config.id.to_string(),
1751 worker_id = config.worker_id,
1752 num_workers = config.worker_count,
1753 "kafka metadata thread: fetched partition metadata info",
1754 );
1755
1756 MetadataUpdate::Partitions(partitions)
1757 }
1758 Err(GetPartitionsError::TopicDoesNotExist) => {
1759 let error = SourceError {
1760 error: SourceErrorDetails::Other("topic was deleted".into()),
1761 };
1762 MetadataUpdate::DefiniteError(error)
1763 }
1764 Err(e) => {
1765 let kafka_status = Some(HealthStatusUpdate::stalled(
1766 format!("{}", e.display_with_causes()),
1767 None,
1768 ));
1769
1770 let ssh_status = consumer.client().context().tunnel_status();
1771 let ssh_status = match ssh_status {
1772 SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1773 SshTunnelStatus::Errored(e) => {
1774 Some(HealthStatusUpdate::stalled(e, None))
1775 }
1776 };
1777
1778 MetadataUpdate::TransientError(HealthStatus {
1779 kafka: kafka_status,
1780 ssh: ssh_status,
1781 })
1782 }
1783 };
1784
1785 if tx.send((probe_ts, update)).is_err() {
1786 break;
1787 }
1788 }
1789
1790 info!(
1791 source_id = config.id.to_string(),
1792 worker_id = config.worker_id,
1793 num_workers = config.worker_count,
1794 "kafka metadata thread: receiver has gone away; shutting down."
1795 )
1796 })
1797 .unwrap();
1798}