1use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::convert::Infallible;
13use std::str::{self};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use chrono::{DateTime, NaiveDateTime};
20use differential_dataflow::{AsCollection, Hashable};
21use futures::StreamExt;
22use itertools::Itertools;
23use maplit::btreemap;
24use mz_kafka_util::client::{
25 GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
26};
27use mz_ore::assert_none;
28use mz_ore::cast::CastFrom;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
34use mz_ssh_util::tunnel::SshTunnelStatus;
35use mz_storage_types::dyncfgs::KAFKA_METADATA_FETCH_INTERVAL;
36use mz_storage_types::errors::{
37 ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
38};
39use mz_storage_types::sources::kafka::{
40 KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
41};
42use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
43use mz_timely_util::antichain::AntichainExt;
44use mz_timely_util::builder_async::{
45 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::containers::stack::AccountedStackBuilder;
48use mz_timely_util::order::Partitioned;
49use rdkafka::consumer::base_consumer::PartitionQueue;
50use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
51use rdkafka::error::KafkaError;
52use rdkafka::message::{BorrowedMessage, Headers};
53use rdkafka::statistics::Statistics;
54use rdkafka::topic_partition_list::Offset;
55use rdkafka::{ClientContext, Message, TopicPartitionList};
56use serde::{Deserialize, Serialize};
57use timely::PartialOrder;
58use timely::container::CapacityContainerBuilder;
59use timely::dataflow::channels::pact::Pipeline;
60use timely::dataflow::operators::core::Partition;
61use timely::dataflow::operators::{Broadcast, Capability};
62use timely::dataflow::{Scope, Stream};
63use timely::progress::Antichain;
64use timely::progress::Timestamp;
65use tokio::sync::{Notify, mpsc};
66use tracing::{error, info, trace};
67
68use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
69use crate::metrics::source::kafka::KafkaSourceMetrics;
70use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
71use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
72use crate::statistics::SourceStatistics;
73
74#[derive(
75 Clone,
76 Debug,
77 Default,
78 PartialEq,
79 Eq,
80 PartialOrd,
81 Ord,
82 Serialize,
83 Deserialize
84)]
85struct HealthStatus {
86 kafka: Option<HealthStatusUpdate>,
87 ssh: Option<HealthStatusUpdate>,
88}
89
90impl HealthStatus {
91 fn kafka(update: HealthStatusUpdate) -> Self {
92 Self {
93 kafka: Some(update),
94 ssh: None,
95 }
96 }
97
98 fn ssh(update: HealthStatusUpdate) -> Self {
99 Self {
100 kafka: None,
101 ssh: Some(update),
102 }
103 }
104}
105
106pub struct KafkaSourceReader {
108 topic_name: String,
110 source_name: String,
112 id: GlobalId,
114 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
116 partition_consumers: Vec<PartitionConsumer>,
118 worker_id: usize,
120 worker_count: usize,
122 last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
126 start_offsets: BTreeMap<PartitionId, i64>,
128 stats_rx: crossbeam_channel::Receiver<Jsonb>,
130 partition_metrics: KafkaSourceMetrics,
132 partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
134}
135
136struct PartitionCapability {
137 data: Capability<KafkaTimestamp>,
139 progress: Capability<KafkaTimestamp>,
141}
142
143type HighWatermark = u64;
147
148pub struct KafkaResumeUpperProcessor {
151 config: RawSourceCreationConfig,
152 topic_name: String,
153 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
154 statistics: Vec<SourceStatistics>,
155}
156
157fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
161 let pid = usize::try_from(pid).expect("positive pid");
162 ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
163}
164
165struct SourceOutputInfo {
166 id: GlobalId,
167 output_index: usize,
168 resume_upper: Antichain<KafkaTimestamp>,
169 metadata_columns: Vec<KafkaMetadataKind>,
170}
171
172impl SourceRender for KafkaSourceConnection {
173 type Time = KafkaTimestamp;
178
179 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
180
181 fn render<G: Scope<Timestamp = KafkaTimestamp>>(
182 self,
183 scope: &mut G,
184 config: &RawSourceCreationConfig,
185 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
186 start_signal: impl std::future::Future<Output = ()> + 'static,
187 ) -> (
188 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
189 Stream<G, Infallible>,
190 Stream<G, HealthStatusMessage>,
191 Option<Stream<G, Probe<KafkaTimestamp>>>,
192 Vec<PressOnDropButton>,
193 ) {
194 let (metadata, probes, metadata_token) =
195 render_metadata_fetcher(scope, self.clone(), config.clone());
196 let (data, progress, health, reader_token) = render_reader(
197 scope,
198 self,
199 config.clone(),
200 resume_uppers,
201 metadata,
202 start_signal,
203 );
204
205 let partition_count = u64::cast_from(config.source_exports.len());
206 let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
207 partition_count,
208 |((output, data), time, diff): &(
209 (usize, Result<SourceMessage, DataflowError>),
210 _,
211 Diff,
212 )| {
213 let output = u64::cast_from(*output);
214 (output, (data.clone(), time.clone(), diff.clone()))
215 },
216 );
217 let mut data_collections = BTreeMap::new();
218 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
219 data_collections.insert(*id, data_stream.as_collection());
220 }
221
222 (
223 data_collections,
224 progress,
225 health,
226 Some(probes),
227 vec![metadata_token, reader_token],
228 )
229 }
230}
231
232fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
237 scope: &G,
238 connection: KafkaSourceConnection,
239 config: RawSourceCreationConfig,
240 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
241 metadata_stream: Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
242 start_signal: impl std::future::Future<Output = ()> + 'static,
243) -> (
244 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
245 Stream<G, Infallible>,
246 Stream<G, HealthStatusMessage>,
247 PressOnDropButton,
248) {
249 let name = format!("KafkaReader({})", config.id);
250 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
251
252 let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
253 let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
254 let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
255
256 let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline);
257
258 let mut outputs = vec![];
259
260 let mut all_export_stats = vec![];
262 let mut snapshot_export_stats = vec![];
263 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
264 let SourceExport {
265 details,
266 storage_metadata: _,
267 data_config: _,
268 } = export;
269 let resume_upper = Antichain::from_iter(
270 config
271 .source_resume_uppers
272 .get(id)
273 .expect("all source exports must be present in source resume uppers")
274 .iter()
275 .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
276 );
277
278 let metadata_columns = match details {
279 SourceExportDetails::Kafka(details) => details
280 .metadata_columns
281 .iter()
282 .map(|(_name, kind)| kind.clone())
283 .collect::<Vec<_>>(),
284 _ => panic!("unexpected source export details: {:?}", details),
285 };
286
287 let statistics = config
288 .statistics
289 .get(id)
290 .expect("statistics have been initialized")
291 .clone();
292 if resume_upper.as_ref() == &[Partitioned::minimum()] {
294 snapshot_export_stats.push(statistics.clone());
295 }
296 all_export_stats.push(statistics);
297
298 let output = SourceOutputInfo {
299 id: *id,
300 resume_upper,
301 output_index: idx,
302 metadata_columns,
303 };
304 outputs.push(output);
305 }
306
307 let busy_signal = Arc::clone(&config.busy_signal);
308 let button = builder.build(move |caps| {
309 SignaledFuture::new(busy_signal, async move {
310 let [mut data_cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
311
312 let client_id = connection.client_id(
313 config.config.config_set(),
314 &config.config.connection_context,
315 config.id,
316 );
317 let group_id = connection.group_id(&config.config.connection_context, config.id);
318 let KafkaSourceConnection {
319 connection,
320 topic,
321 topic_metadata_refresh_interval,
322 start_offsets,
323 metadata_columns: _,
324 connection_id: _, group_id_prefix: _, } = connection;
329
330 let mut start_offsets: BTreeMap<_, i64> = start_offsets
332 .clone()
333 .into_iter()
334 .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
335 .map(|(k, v)| (k, v))
336 .collect();
337
338 let mut partition_capabilities = BTreeMap::new();
339 let mut max_pid = None;
340 let resume_upper = Antichain::from_iter(
341 outputs
342 .iter()
343 .map(|output| output.resume_upper.clone())
344 .flatten(),
345 );
346
347 for ts in resume_upper.elements() {
348 if let Some(pid) = ts.interval().singleton() {
349 let pid = pid.unwrap_exact();
350 max_pid = std::cmp::max(max_pid, Some(*pid));
351 if responsible_for_pid(&config, *pid) {
352 let restored_offset = i64::try_from(ts.timestamp().offset)
353 .expect("restored kafka offsets must fit into i64");
354 if let Some(start_offset) = start_offsets.get_mut(pid) {
355 *start_offset = std::cmp::max(restored_offset, *start_offset);
356 } else {
357 start_offsets.insert(*pid, restored_offset);
358 }
359
360 let part_ts = Partitioned::new_singleton(
361 RangeBound::exact(*pid),
362 ts.timestamp().clone(),
363 );
364 let part_cap = PartitionCapability {
365 data: data_cap.delayed(&part_ts),
366 progress: progress_cap.delayed(&part_ts),
367 };
368 partition_capabilities.insert(*pid, part_cap);
369 }
370 }
371 }
372 let lower = max_pid
373 .map(RangeBound::after)
374 .unwrap_or(RangeBound::NegInfinity);
375 let future_ts =
376 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
377 data_cap.downgrade(&future_ts);
378 progress_cap.downgrade(&future_ts);
379
380 info!(
381 source_id = config.id.to_string(),
382 worker_id = config.worker_id,
383 num_workers = config.worker_count,
384 "instantiating Kafka source reader at offsets {start_offsets:?}"
385 );
386
387 let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
388 let notificator = Arc::new(Notify::new());
389
390 let consumer: Result<BaseConsumer<_>, _> = connection
391 .create_with_context(
392 &config.config,
393 GlueConsumerContext {
394 notificator: Arc::clone(¬ificator),
395 stats_tx,
396 inner: MzClientContext::default(),
397 },
398 &btreemap! {
399 "enable.auto.commit" => "false".into(),
404 "auto.offset.reset" => "earliest".into(),
407 "topic.metadata.refresh.interval.ms" =>
410 topic_metadata_refresh_interval
411 .as_millis()
412 .to_string(),
413 "fetch.message.max.bytes" => "134217728".into(),
415 "group.id" => group_id.clone(),
421 "client.id" => client_id.clone(),
424 },
425 InTask::Yes,
426 )
427 .await;
428
429 let consumer = match consumer {
430 Ok(consumer) => Arc::new(consumer),
431 Err(e) => {
432 let update = HealthStatusUpdate::halting(
433 format!(
434 "failed creating kafka reader consumer: {}",
435 e.display_with_causes()
436 ),
437 None,
438 );
439 health_output.give(
440 &health_cap,
441 HealthStatusMessage {
442 id: None,
443 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
444 StatusNamespace::Ssh
445 } else {
446 StatusNamespace::Kafka
447 },
448 update: update.clone(),
449 },
450 );
451 for (output, update) in outputs.iter().repeat_clone(update) {
452 health_output.give(
453 &health_cap,
454 HealthStatusMessage {
455 id: Some(output.id),
456 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
457 StatusNamespace::Ssh
458 } else {
459 StatusNamespace::Kafka
460 },
461 update,
462 },
463 );
464 }
465 std::future::pending::<()>().await;
469 unreachable!("pending future never returns");
470 }
471 };
472
473 start_signal.await;
477 info!(
478 source_id = config.id.to_string(),
479 worker_id = config.worker_id,
480 num_workers = config.worker_count,
481 "kafka worker noticed rehydration is finished, starting partition queues..."
482 );
483
484 let partition_ids = start_offsets.keys().copied().collect();
485 let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
486
487 let mut reader = KafkaSourceReader {
488 topic_name: topic.clone(),
489 source_name: config.name.clone(),
490 id: config.id,
491 partition_consumers: Vec::new(),
492 consumer: Arc::clone(&consumer),
493 worker_id: config.worker_id,
494 worker_count: config.worker_count,
495 last_offsets: outputs
496 .iter()
497 .map(|output| (output.output_index, BTreeMap::new()))
498 .collect(),
499 start_offsets,
500 stats_rx,
501 partition_metrics: config.metrics.get_kafka_source_metrics(
502 partition_ids,
503 topic.clone(),
504 config.id,
505 ),
506 partition_capabilities,
507 };
508
509 let offset_committer = KafkaResumeUpperProcessor {
510 config: config.clone(),
511 topic_name: topic.clone(),
512 consumer,
513 statistics: all_export_stats.clone(),
514 };
515
516 if !snapshot_export_stats.is_empty() {
518 if let Err(e) = offset_committer
519 .process_frontier(resume_upper.clone())
520 .await
521 {
522 offset_commit_metrics.offset_commit_failures.inc();
523 tracing::warn!(
524 %e,
525 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
526 worker_id = config.worker_id,
527 source_id = config.id,
528 upper = resume_upper.pretty()
529 );
530 }
531 for statistics in config.statistics.values() {
535 statistics.set_snapshot_records_known(0);
536 statistics.set_snapshot_records_staged(0);
537 }
538 }
539
540 let resume_uppers_process_loop = async move {
541 tokio::pin!(resume_uppers);
542 while let Some(frontier) = resume_uppers.next().await {
543 if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
544 offset_commit_metrics.offset_commit_failures.inc();
545 tracing::warn!(
546 %e,
547 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
548 worker_id = config.worker_id,
549 source_id = config.id,
550 upper = frontier.pretty()
551 );
552 }
553 }
554 std::future::pending::<()>().await;
559 };
560 tokio::pin!(resume_uppers_process_loop);
561
562 let mut metadata_update: Option<MetadataUpdate> = None;
563 let mut snapshot_total = None;
564
565 let max_wait_time =
566 mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
567 loop {
568 tokio::select! {
571 _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
574
575 _ = metadata_input.ready() => {
576 let mut updates = Vec::new();
578 while let Some(event) = metadata_input.next_sync() {
579 if let Event::Data(_, mut data) = event {
580 updates.append(&mut data);
581 }
582 }
583 metadata_update = updates
584 .into_iter()
585 .max_by_key(|(ts, _)| *ts)
586 .map(|(_, update)| update);
587 }
588
589 _ = resume_uppers_process_loop.as_mut() => {},
593 }
594
595 match metadata_update.take() {
596 Some(MetadataUpdate::Partitions(partitions)) => {
597 let max_pid = partitions.keys().last().cloned();
598 let lower = max_pid
599 .map(RangeBound::after)
600 .unwrap_or(RangeBound::NegInfinity);
601 let future_ts = Partitioned::new_range(
602 lower,
603 RangeBound::PosInfinity,
604 MzOffset::from(0),
605 );
606
607 let mut offset_known = 0;
608 for (&pid, &high_watermark) in &partitions {
609 if responsible_for_pid(&config, pid) {
610 offset_known += high_watermark;
611 reader.ensure_partition(pid);
612 if let Entry::Vacant(entry) =
613 reader.partition_capabilities.entry(pid)
614 {
615 let start_offset = match reader.start_offsets.get(&pid) {
616 Some(&offset) => offset.try_into().unwrap(),
617 None => 0u64,
618 };
619 let part_since_ts = Partitioned::new_singleton(
620 RangeBound::exact(pid),
621 MzOffset::from(start_offset),
622 );
623 let part_upper_ts = Partitioned::new_singleton(
624 RangeBound::exact(pid),
625 MzOffset::from(high_watermark),
626 );
627
628 entry.insert(PartitionCapability {
638 data: data_cap.delayed(&part_since_ts),
639 progress: progress_cap.delayed(&part_upper_ts),
640 });
641 }
642 }
643 }
644
645 if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
648 snapshot_total = Some(offset_known);
652 }
653
654 for output in &outputs {
658 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
659 health_output.give(
660 &health_cap,
661 HealthStatusMessage {
662 id: Some(output.id),
663 namespace,
664 update: HealthStatusUpdate::running(),
665 },
666 );
667 }
668 }
669 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
670 health_output.give(
671 &health_cap,
672 HealthStatusMessage {
673 id: None,
674 namespace,
675 update: HealthStatusUpdate::running(),
676 },
677 );
678 }
679
680 for export_stat in all_export_stats.iter() {
681 export_stat.set_offset_known(offset_known);
682 }
683
684 data_cap.downgrade(&future_ts);
685 progress_cap.downgrade(&future_ts);
686 }
687 Some(MetadataUpdate::TransientError(status)) => {
688 if let Some(update) = status.kafka {
689 health_output.give(
690 &health_cap,
691 HealthStatusMessage {
692 id: None,
693 namespace: StatusNamespace::Kafka,
694 update: update.clone(),
695 },
696 );
697 for (output, update) in outputs.iter().repeat_clone(update) {
698 health_output.give(
699 &health_cap,
700 HealthStatusMessage {
701 id: Some(output.id),
702 namespace: StatusNamespace::Kafka,
703 update,
704 },
705 );
706 }
707 }
708 if let Some(update) = status.ssh {
709 health_output.give(
710 &health_cap,
711 HealthStatusMessage {
712 id: None,
713 namespace: StatusNamespace::Ssh,
714 update: update.clone(),
715 },
716 );
717 for (output, update) in outputs.iter().repeat_clone(update) {
718 health_output.give(
719 &health_cap,
720 HealthStatusMessage {
721 id: Some(output.id),
722 namespace: StatusNamespace::Ssh,
723 update,
724 },
725 );
726 }
727 }
728 }
729 Some(MetadataUpdate::DefiniteError(error)) => {
730 health_output.give(
731 &health_cap,
732 HealthStatusMessage {
733 id: None,
734 namespace: StatusNamespace::Kafka,
735 update: HealthStatusUpdate::stalled(
736 error.to_string(),
737 None,
738 ),
739 },
740 );
741 let error = Err(error.into());
742 let time = data_cap.time().clone();
743 for (output, error) in
744 outputs.iter().map(|o| o.output_index).repeat_clone(error)
745 {
746 data_output
747 .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
748 .await;
749 }
750
751 return;
752 }
753 None => {}
754 }
755
756 while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
764 match result {
765 Err(e) => {
766 let error = format!(
767 "kafka error when polling consumer for source: {} topic: {} : {}",
768 reader.source_name, reader.topic_name, e
769 );
770 let status = HealthStatusUpdate::stalled(error, None);
771 health_output.give(
772 &health_cap,
773 HealthStatusMessage {
774 id: None,
775 namespace: StatusNamespace::Kafka,
776 update: status.clone(),
777 },
778 );
779 for (output, status) in outputs.iter().repeat_clone(status) {
780 health_output.give(
781 &health_cap,
782 HealthStatusMessage {
783 id: Some(output.id),
784 namespace: StatusNamespace::Kafka,
785 update: status,
786 },
787 );
788 }
789 }
790 Ok(message) => {
791 let output_messages = outputs
792 .iter()
793 .map(|output| {
794 let (message, ts) = construct_source_message(
795 &message,
796 &output.metadata_columns,
797 );
798 (output.output_index, message, ts)
799 })
800 .collect::<Vec<_>>();
804 for (output_index, message, ts) in output_messages {
805 if let Some((msg, time, diff)) =
806 reader.handle_message(message, ts, &output_index)
807 {
808 let pid = time.interval().singleton().unwrap().unwrap_exact();
809 let part_cap = &reader.partition_capabilities[pid].data;
810 let msg = msg.map_err(|e| {
811 DataflowError::SourceError(Box::new(SourceError {
812 error: SourceErrorDetails::Other(e.to_string().into()),
813 }))
814 });
815 data_output
816 .give_fueled(part_cap, ((output_index, msg), time, diff))
817 .await;
818 }
819 }
820 }
821 }
822 }
823
824 reader.update_stats();
825
826 let mut consumers = std::mem::take(&mut reader.partition_consumers);
828 for consumer in consumers.iter_mut() {
829 let pid = consumer.pid();
830 let mut partition_exhausted = false;
836 for _ in 0..10_000 {
837 let Some(message) = consumer.get_next_message().transpose() else {
838 partition_exhausted = true;
839 break;
840 };
841
842 for output in outputs.iter() {
843 let message = match &message {
844 Ok((msg, pid)) => {
845 let (msg, ts) =
846 construct_source_message(msg, &output.metadata_columns);
847 assert_eq!(*pid, ts.0);
848 Ok(reader.handle_message(msg, ts, &output.output_index))
849 }
850 Err(err) => Err(err),
851 };
852 match message {
853 Ok(Some((msg, time, diff))) => {
854 let pid = time.interval().singleton().unwrap().unwrap_exact();
855 let part_cap = &reader.partition_capabilities[pid].data;
856 let msg = msg.map_err(|e| {
857 DataflowError::SourceError(Box::new(SourceError {
858 error: SourceErrorDetails::Other(e.to_string().into()),
859 }))
860 });
861 data_output
862 .give_fueled(
863 part_cap,
864 ((output.output_index, msg), time, diff),
865 )
866 .await;
867 }
868 Ok(None) => continue,
870 Err(err) => {
871 let last_offset = reader
872 .last_offsets
873 .get(&output.output_index)
874 .expect("output known to be installed")
875 .get(&pid)
876 .expect("partition known to be installed");
877
878 let status = HealthStatusUpdate::stalled(
879 format!(
880 "error consuming from source: {} topic: {topic}:\
881 partition: {pid} last processed offset:\
882 {last_offset} : {err}",
883 config.name
884 ),
885 None,
886 );
887 health_output.give(
888 &health_cap,
889 HealthStatusMessage {
890 id: None,
891 namespace: StatusNamespace::Kafka,
892 update: status.clone(),
893 },
894 );
895 health_output.give(
896 &health_cap,
897 HealthStatusMessage {
898 id: Some(output.id),
899 namespace: StatusNamespace::Kafka,
900 update: status,
901 },
902 );
903 }
904 }
905 }
906 }
907 if !partition_exhausted {
908 notificator.notify_one();
909 }
910 }
911 assert!(reader.partition_consumers.is_empty());
913 reader.partition_consumers = consumers;
914
915 let positions = reader.consumer.position().unwrap();
916 let topic_positions = positions.elements_for_topic(&reader.topic_name);
917 let mut snapshot_staged = 0;
918
919 for position in topic_positions {
920 if let Offset::Offset(offset) = position.offset() {
923 let pid = position.partition();
924 let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
925 let upper =
926 Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
927
928 let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
929 match part_cap.data.try_downgrade(&upper) {
930 Ok(()) => {
931 if !snapshot_export_stats.is_empty() {
932 snapshot_staged += offset.try_into().unwrap_or(0u64);
935 if let Some(snapshot_total) = snapshot_total {
937 snapshot_staged =
940 std::cmp::min(snapshot_staged, snapshot_total);
941 }
942 }
943 }
944 Err(_) => {
945 info!(
948 source_id = config.id.to_string(),
949 worker_id = config.worker_id,
950 num_workers = config.worker_count,
951 "kafka source frontier downgrade skipped due to already \
952 seen offset: {:?}",
953 upper
954 );
955 }
956 };
957
958 let _ = part_cap.progress.try_downgrade(&upper);
963 }
964 }
965
966 if let (Some(snapshot_total), true) =
967 (snapshot_total, !snapshot_export_stats.is_empty())
968 {
969 for export_stat in snapshot_export_stats.iter() {
970 export_stat.set_snapshot_records_known(snapshot_total);
971 export_stat.set_snapshot_records_staged(snapshot_staged);
972 }
973 if snapshot_total == snapshot_staged {
974 snapshot_export_stats.clear();
975 }
976 }
977 }
978 })
979 });
980
981 (
982 stream.as_collection(),
983 progress_stream,
984 health_stream,
985 button.press_on_drop(),
986 )
987}
988
989impl KafkaResumeUpperProcessor {
990 async fn process_frontier(
991 &self,
992 frontier: Antichain<KafkaTimestamp>,
993 ) -> Result<(), anyhow::Error> {
994 use rdkafka::consumer::CommitMode;
995
996 let mut offsets = vec![];
998 let mut offset_committed = 0;
999 for ts in frontier.iter() {
1000 if let Some(pid) = ts.interval().singleton() {
1001 let pid = pid.unwrap_exact();
1002 if responsible_for_pid(&self.config, *pid) {
1003 offsets.push((pid.clone(), *ts.timestamp()));
1004
1005 offset_committed += ts.timestamp().offset;
1010 }
1011 }
1012 }
1013
1014 for export_stat in self.statistics.iter() {
1015 export_stat.set_offset_committed(offset_committed);
1016 }
1017
1018 if !offsets.is_empty() {
1019 let mut tpl = TopicPartitionList::new();
1020 for (pid, offset) in offsets {
1021 let offset_to_commit =
1022 Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
1023 tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
1024 .expect("offset known to be valid");
1025 }
1026 let consumer = Arc::clone(&self.consumer);
1027 mz_ore::task::spawn_blocking(
1028 || format!("source({}) kafka offset commit", self.config.id),
1029 move || consumer.commit(&tpl, CommitMode::Sync),
1030 )
1031 .await?;
1032 }
1033 Ok(())
1034 }
1035}
1036
1037impl KafkaSourceReader {
1038 fn ensure_partition(&mut self, pid: PartitionId) {
1040 if self.last_offsets.is_empty() {
1041 tracing::info!(
1042 source_id = %self.id,
1043 worker_id = %self.worker_id,
1044 "kafka source does not have any outputs, not creating partition queue");
1045
1046 return;
1047 }
1048 for last_offsets in self.last_offsets.values() {
1049 if last_offsets.contains_key(&pid) {
1051 return;
1052 }
1053 }
1054
1055 let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1056 self.create_partition_queue(pid, Offset::Offset(start_offset));
1057
1058 for last_offsets in self.last_offsets.values_mut() {
1059 let prev = last_offsets.insert(pid, start_offset - 1);
1060 assert_none!(prev);
1061 }
1062 }
1063
1064 fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1066 info!(
1067 source_id = self.id.to_string(),
1068 worker_id = self.worker_id,
1069 num_workers = self.worker_count,
1070 "activating Kafka queue for topic {}, partition {}",
1071 self.topic_name,
1072 partition_id,
1073 );
1074
1075 let tpl = self.consumer.assignment().unwrap();
1077 let mut partition_list = TopicPartitionList::new();
1079 for partition in tpl.elements_for_topic(&self.topic_name) {
1080 partition_list
1081 .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1082 .expect("offset known to be valid");
1083 }
1084 partition_list
1086 .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1087 .expect("offset known to be valid");
1088 self.consumer
1089 .assign(&partition_list)
1090 .expect("assignment known to be valid");
1091
1092 let context = Arc::clone(self.consumer.context());
1095 for pc in &mut self.partition_consumers {
1096 pc.partition_queue = self
1097 .consumer
1098 .split_partition_queue(&self.topic_name, pc.pid)
1099 .expect("partition known to be valid");
1100 pc.partition_queue.set_nonempty_callback({
1101 let context = Arc::clone(&context);
1102 move || context.inner().activate()
1103 });
1104 }
1105
1106 let mut partition_queue = self
1107 .consumer
1108 .split_partition_queue(&self.topic_name, partition_id)
1109 .expect("partition known to be valid");
1110 partition_queue.set_nonempty_callback(move || context.inner().activate());
1111 self.partition_consumers
1112 .push(PartitionConsumer::new(partition_id, partition_queue));
1113 assert_eq!(
1114 self.consumer
1115 .assignment()
1116 .unwrap()
1117 .elements_for_topic(&self.topic_name)
1118 .len(),
1119 self.partition_consumers.len()
1120 );
1121 }
1122
1123 fn update_stats(&mut self) {
1125 while let Ok(stats) = self.stats_rx.try_recv() {
1126 match serde_json::from_str::<Statistics>(&stats.to_string()) {
1127 Ok(statistics) => {
1128 let topic = statistics.topics.get(&self.topic_name);
1129 match topic {
1130 Some(topic) => {
1131 for (id, partition) in &topic.partitions {
1132 self.partition_metrics
1133 .set_offset_max(*id, partition.hi_offset);
1134 }
1135 }
1136 None => error!("No stats found for topic: {}", &self.topic_name),
1137 }
1138 }
1139 Err(e) => {
1140 error!("failed decoding librdkafka statistics JSON: {}", e);
1141 }
1142 }
1143 }
1144 }
1145
1146 fn handle_message(
1149 &mut self,
1150 message: Result<SourceMessage, KafkaHeaderParseError>,
1151 (partition, offset): (PartitionId, MzOffset),
1152 output_index: &usize,
1153 ) -> Option<(
1154 Result<SourceMessage, KafkaHeaderParseError>,
1155 KafkaTimestamp,
1156 Diff,
1157 )> {
1158 assert!(
1170 self.last_offsets
1171 .get(output_index)
1172 .unwrap()
1173 .contains_key(&partition)
1174 );
1175
1176 let last_offset_ref = self
1177 .last_offsets
1178 .get_mut(output_index)
1179 .expect("output known to be installed")
1180 .get_mut(&partition)
1181 .expect("partition known to be installed");
1182
1183 let last_offset = *last_offset_ref;
1184 let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1185 if offset_as_i64 <= last_offset {
1186 info!(
1187 source_id = self.id.to_string(),
1188 worker_id = self.worker_id,
1189 num_workers = self.worker_count,
1190 "kafka message before expected offset: \
1191 source {} (reading topic {}, partition {}, output {}) \
1192 received offset {} expected offset {:?}",
1193 self.source_name,
1194 self.topic_name,
1195 partition,
1196 output_index,
1197 offset.offset,
1198 last_offset + 1,
1199 );
1200 None
1202 } else {
1203 *last_offset_ref = offset_as_i64;
1204
1205 let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1206 Some((message, ts, Diff::ONE))
1207 }
1208 }
1209}
1210
1211fn construct_source_message(
1212 msg: &BorrowedMessage<'_>,
1213 metadata_columns: &[KafkaMetadataKind],
1214) -> (
1215 Result<SourceMessage, KafkaHeaderParseError>,
1216 (PartitionId, MzOffset),
1217) {
1218 let pid = msg.partition();
1219 let Ok(offset) = u64::try_from(msg.offset()) else {
1220 panic!(
1221 "got negative offset ({}) from otherwise non-error'd kafka message",
1222 msg.offset()
1223 );
1224 };
1225
1226 let mut metadata = Row::default();
1227 let mut packer = metadata.packer();
1228 for kind in metadata_columns {
1229 match kind {
1230 KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1231 KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1232 KafkaMetadataKind::Timestamp => {
1233 let ts = msg
1234 .timestamp()
1235 .to_millis()
1236 .expect("kafka sources always have upstream_time");
1237
1238 let d: Datum = DateTime::from_timestamp_millis(ts)
1239 .and_then(|dt| {
1240 let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1241 dt.naive_utc().try_into().ok();
1242 ct
1243 })
1244 .into();
1245 packer.push(d)
1246 }
1247 KafkaMetadataKind::Header { key, use_bytes } => {
1248 match msg.headers() {
1249 Some(headers) => {
1250 let d = headers
1251 .iter()
1252 .filter(|header| header.key == key)
1253 .last()
1254 .map(|header| match header.value {
1255 Some(v) => {
1256 if *use_bytes {
1257 Ok(Datum::Bytes(v))
1258 } else {
1259 match str::from_utf8(v) {
1260 Ok(str) => Ok(Datum::String(str)),
1261 Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1262 key: key.clone(),
1263 raw: v.to_vec(),
1264 }),
1265 }
1266 }
1267 }
1268 None => Ok(Datum::Null),
1269 })
1270 .unwrap_or_else(|| {
1271 Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1272 });
1273 match d {
1274 Ok(d) => packer.push(d),
1275 Err(err) => return (Err(err), (pid, offset.into())),
1277 }
1278 }
1279 None => packer.push(Datum::Null),
1280 }
1281 }
1282 KafkaMetadataKind::Headers => {
1283 packer.push_list_with(|r| {
1284 if let Some(headers) = msg.headers() {
1285 for header in headers.iter() {
1286 match header.value {
1287 Some(v) => r.push_list_with(|record_row| {
1288 record_row.push(Datum::String(header.key));
1289 record_row.push(Datum::Bytes(v));
1290 }),
1291 None => r.push_list_with(|record_row| {
1292 record_row.push(Datum::String(header.key));
1293 record_row.push(Datum::Null);
1294 }),
1295 }
1296 }
1297 }
1298 });
1299 }
1300 }
1301 }
1302
1303 let key = match msg.key() {
1304 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1305 None => Row::pack([Datum::Null]),
1306 };
1307 let value = match msg.payload() {
1308 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1309 None => Row::pack([Datum::Null]),
1310 };
1311 (
1312 Ok(SourceMessage {
1313 key,
1314 value,
1315 metadata,
1316 }),
1317 (pid, offset.into()),
1318 )
1319}
1320
1321struct PartitionConsumer {
1323 pid: PartitionId,
1325 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1327}
1328
1329impl PartitionConsumer {
1330 fn new(
1332 pid: PartitionId,
1333 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1334 ) -> Self {
1335 PartitionConsumer {
1336 pid,
1337 partition_queue,
1338 }
1339 }
1340
1341 fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1348 match self.partition_queue.poll(Duration::from_millis(0)) {
1349 Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1350 Some(Err(err)) => Err(err),
1351 _ => Ok(None),
1352 }
1353 }
1354
1355 fn pid(&self) -> PartitionId {
1357 self.pid
1358 }
1359}
1360
1361struct GlueConsumerContext {
1364 notificator: Arc<Notify>,
1365 stats_tx: crossbeam_channel::Sender<Jsonb>,
1366 inner: MzClientContext,
1367}
1368
1369impl ClientContext for GlueConsumerContext {
1370 fn stats_raw(&self, statistics: &[u8]) {
1371 match Jsonb::from_slice(statistics) {
1372 Ok(statistics) => {
1373 self.stats_tx
1374 .send(statistics)
1375 .expect("timely operator hung up while Kafka source active");
1376 self.activate();
1377 }
1378 Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1379 };
1380 }
1381
1382 fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1385 self.inner.log(level, fac, log_message)
1386 }
1387 fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1388 self.inner.error(error, reason)
1389 }
1390}
1391
1392impl GlueConsumerContext {
1393 fn activate(&self) {
1394 self.notificator.notify_one();
1395 }
1396}
1397
1398impl ConsumerContext for GlueConsumerContext {}
1399
1400#[cfg(test)]
1401mod tests {
1402 use std::sync::Arc;
1403 use std::time::Duration;
1404
1405 use mz_kafka_util::client::create_new_client_config_simple;
1406 use rdkafka::consumer::{BaseConsumer, Consumer};
1407 use rdkafka::{Message, Offset, TopicPartitionList};
1408 use uuid::Uuid;
1409
1410 #[mz_ore::test]
1422 #[ignore]
1423 fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1424 let topic_name = "queue-test";
1425 let pid = 0;
1426
1427 let mut kafka_config = create_new_client_config_simple();
1428 kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1429 kafka_config.set("enable.auto.commit", "false");
1430 kafka_config.set("group.id", Uuid::new_v4().to_string());
1431 kafka_config.set("fetch.message.max.bytes", "100");
1432 let consumer: BaseConsumer<_> = kafka_config.create()?;
1433
1434 let consumer = Arc::new(consumer);
1435
1436 let mut partition_list = TopicPartitionList::new();
1437 partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1440
1441 consumer.assign(&partition_list)?;
1442
1443 let partition_queue = consumer
1444 .split_partition_queue(topic_name, pid)
1445 .expect("missing partition queue");
1446
1447 let expected_messages = 1_000;
1448
1449 let mut common_queue_count = 0;
1450 let mut partition_queue_count = 0;
1451
1452 loop {
1453 if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1454 match msg {
1455 Ok(msg) => {
1456 let _payload =
1457 std::str::from_utf8(msg.payload().expect("missing payload"))?;
1458 if partition_queue_count > 0 {
1459 anyhow::bail!(
1460 "Got message from common queue after we internally switched to partition queue."
1461 );
1462 }
1463
1464 common_queue_count += 1;
1465 }
1466 Err(err) => anyhow::bail!("{}", err),
1467 }
1468 }
1469
1470 match partition_queue.poll(Duration::from_millis(0)) {
1471 Some(Ok(msg)) => {
1472 let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1473 partition_queue_count += 1;
1474 }
1475 Some(Err(err)) => anyhow::bail!("{}", err),
1476 _ => (),
1477 }
1478
1479 if (common_queue_count + partition_queue_count) == expected_messages {
1480 break;
1481 }
1482 }
1483
1484 assert!(
1485 common_queue_count == 0,
1486 "Got {} out of {} messages from common queue. Partition queue: {}",
1487 common_queue_count,
1488 expected_messages,
1489 partition_queue_count
1490 );
1491
1492 Ok(())
1493 }
1494}
1495
1496fn fetch_partition_info<C: ConsumerContext>(
1498 consumer: &BaseConsumer<C>,
1499 topic: &str,
1500 fetch_timeout: Duration,
1501) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1502 let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1503
1504 let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1505 for pid in pids {
1506 offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1507 }
1508
1509 let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1510
1511 let mut result = BTreeMap::new();
1512 for entry in offset_responses.elements() {
1513 let offset = match entry.offset() {
1514 Offset::Offset(offset) => offset,
1515 offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1516 };
1517
1518 let pid = entry.partition();
1519 let watermark = offset.try_into().expect("invalid negative offset");
1520 result.insert(pid, watermark);
1521 }
1522
1523 Ok(result)
1524}
1525
1526#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1528enum MetadataUpdate {
1529 Partitions(BTreeMap<PartitionId, HighWatermark>),
1531 TransientError(HealthStatus),
1535 DefiniteError(SourceError),
1539}
1540
1541impl MetadataUpdate {
1542 fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1544 match self {
1545 Self::Partitions(partitions) => {
1546 let max_pid = partitions.keys().last().copied();
1547 let lower = max_pid
1548 .map(RangeBound::after)
1549 .unwrap_or(RangeBound::NegInfinity);
1550 let future_ts =
1551 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1552
1553 let mut frontier = Antichain::from_elem(future_ts);
1554 for (pid, high_watermark) in partitions {
1555 frontier.insert(Partitioned::new_singleton(
1556 RangeBound::exact(*pid),
1557 MzOffset::from(*high_watermark),
1558 ));
1559 }
1560
1561 Some(frontier)
1562 }
1563 Self::DefiniteError(_) => Some(Antichain::new()),
1564 Self::TransientError(_) => None,
1565 }
1566 }
1567}
1568
1569#[derive(Debug, thiserror::Error)]
1570pub enum KafkaHeaderParseError {
1571 #[error("A header with key '{key}' was not found in the message headers")]
1572 KeyNotFound { key: String },
1573 #[error(
1574 "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1575 )]
1576 Utf8Error { key: String, raw: Vec<u8> },
1577}
1578
1579fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
1585 scope: &G,
1586 connection: KafkaSourceConnection,
1587 config: RawSourceCreationConfig,
1588) -> (
1589 Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
1590 Stream<G, Probe<KafkaTimestamp>>,
1591 PressOnDropButton,
1592) {
1593 let active_worker_id = usize::cast_from(config.id.hashed());
1594 let is_active_worker = active_worker_id % scope.peers() == scope.index();
1595
1596 let resume_upper = Antichain::from_iter(
1597 config
1598 .source_resume_uppers
1599 .values()
1600 .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1601 .flatten(),
1602 );
1603
1604 let name = format!("KafkaMetadataFetcher({})", config.id);
1605 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1606
1607 let (metadata_output, metadata_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1608 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1609
1610 let button = builder.build(move |caps| async move {
1611 if !is_active_worker {
1612 return;
1613 }
1614
1615 let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1616
1617 let client_id = connection.client_id(
1618 config.config.config_set(),
1619 &config.config.connection_context,
1620 config.id,
1621 );
1622 let KafkaSourceConnection {
1623 connection,
1624 topic,
1625 topic_metadata_refresh_interval,
1626 ..
1627 } = connection;
1628
1629 let consumer: Result<BaseConsumer<_>, _> = connection
1630 .create_with_context(
1631 &config.config,
1632 MzClientContext::default(),
1633 &btreemap! {
1634 "topic.metadata.refresh.interval.ms" =>
1637 topic_metadata_refresh_interval
1638 .as_millis()
1639 .to_string(),
1640 "client.id" => format!("{client_id}-metadata"),
1643 },
1644 InTask::Yes,
1645 )
1646 .await;
1647
1648 let consumer = match consumer {
1649 Ok(consumer) => consumer,
1650 Err(e) => {
1651 let msg = format!(
1652 "failed creating kafka metadata consumer: {}",
1653 e.display_with_causes()
1654 );
1655 let status_update = HealthStatusUpdate::halting(msg, None);
1656 let status = match e {
1657 ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1658 _ => HealthStatus::kafka(status_update),
1659 };
1660 let error = MetadataUpdate::TransientError(status);
1661 let timestamp = (config.now_fn)().into();
1662 metadata_output.give(&metadata_cap, (timestamp, error));
1663
1664 std::future::pending::<()>().await;
1668 unreachable!("pending future never returns");
1669 }
1670 };
1671
1672 let (tx, mut rx) = mpsc::unbounded_channel();
1673 spawn_metadata_thread(config, consumer, topic, tx);
1674
1675 let mut prev_upstream_frontier = resume_upper;
1676
1677 while let Some((timestamp, mut update)) = rx.recv().await {
1678 if prev_upstream_frontier.is_empty() {
1679 return;
1680 }
1681
1682 if let Some(upstream_frontier) = update.upstream_frontier() {
1683 if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1693 let error = SourceError {
1694 error: SourceErrorDetails::Other("topic was recreated".into()),
1695 };
1696 update = MetadataUpdate::DefiniteError(error);
1697 }
1698 }
1699
1700 if let Some(upstream_frontier) = update.upstream_frontier() {
1701 prev_upstream_frontier = upstream_frontier.clone();
1702
1703 let probe = Probe {
1704 probe_ts: timestamp,
1705 upstream_frontier,
1706 };
1707 probe_output.give(&probe_cap, probe);
1708 }
1709
1710 metadata_output.give(&metadata_cap, (timestamp, update));
1711 }
1712 });
1713
1714 (metadata_stream, probe_stream, button.press_on_drop())
1715}
1716
1717fn spawn_metadata_thread<C: ConsumerContext>(
1718 config: RawSourceCreationConfig,
1719 consumer: BaseConsumer<TunnelingClientContext<C>>,
1720 topic: String,
1721 tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1722) {
1723 thread::Builder::new()
1725 .name(format!("kfk-mtdt-{}", config.id))
1726 .spawn(move || {
1727 trace!(
1728 source_id = config.id.to_string(),
1729 worker_id = config.worker_id,
1730 num_workers = config.worker_count,
1731 "kafka metadata thread: starting..."
1732 );
1733
1734 let mut ticker = probe::Ticker::new(
1735 || KAFKA_METADATA_FETCH_INTERVAL.get(config.config.config_set()),
1736 config.now_fn,
1737 );
1738
1739 loop {
1740 let probe_ts = ticker.tick_blocking();
1741 let result = fetch_partition_info(
1742 &consumer,
1743 &topic,
1744 config
1745 .config
1746 .parameters
1747 .kafka_timeout_config
1748 .fetch_metadata_timeout,
1749 );
1750 trace!(
1751 source_id = config.id.to_string(),
1752 worker_id = config.worker_id,
1753 num_workers = config.worker_count,
1754 "kafka metadata thread: metadata fetch result: {:?}",
1755 result
1756 );
1757 let update = match result {
1758 Ok(partitions) => {
1759 trace!(
1760 source_id = config.id.to_string(),
1761 worker_id = config.worker_id,
1762 num_workers = config.worker_count,
1763 "kafka metadata thread: fetched partition metadata info",
1764 );
1765
1766 MetadataUpdate::Partitions(partitions)
1767 }
1768 Err(GetPartitionsError::TopicDoesNotExist) => {
1769 let error = SourceError {
1770 error: SourceErrorDetails::Other("topic was deleted".into()),
1771 };
1772 MetadataUpdate::DefiniteError(error)
1773 }
1774 Err(e) => {
1775 let kafka_status = Some(HealthStatusUpdate::stalled(
1776 format!("{}", e.display_with_causes()),
1777 None,
1778 ));
1779
1780 let ssh_status = consumer.client().context().tunnel_status();
1781 let ssh_status = match ssh_status {
1782 SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1783 SshTunnelStatus::Errored(e) => {
1784 Some(HealthStatusUpdate::stalled(e, None))
1785 }
1786 };
1787
1788 MetadataUpdate::TransientError(HealthStatus {
1789 kafka: kafka_status,
1790 ssh: ssh_status,
1791 })
1792 }
1793 };
1794
1795 if tx.send((probe_ts, update)).is_err() {
1796 break;
1797 }
1798 }
1799
1800 info!(
1801 source_id = config.id.to_string(),
1802 worker_id = config.worker_id,
1803 num_workers = config.worker_count,
1804 "kafka metadata thread: receiver has gone away; shutting down."
1805 )
1806 })
1807 .unwrap();
1808}