1use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::str::{self};
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use chrono::{DateTime, NaiveDateTime};
19use differential_dataflow::{AsCollection, Hashable};
20use futures::StreamExt;
21use itertools::Itertools;
22use maplit::btreemap;
23use mz_kafka_util::client::{
24 GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
25};
26use mz_ore::assert_none;
27use mz_ore::cast::CastFrom;
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::iter::IteratorExt;
31use mz_repr::adt::timestamp::CheckedTimestamp;
32use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
33use mz_ssh_util::tunnel::SshTunnelStatus;
34use mz_storage_types::errors::{
35 ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
36};
37use mz_storage_types::sources::kafka::{
38 KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
39};
40use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
41use mz_timely_util::antichain::AntichainExt;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::containers::stack::FueledBuilder;
46use mz_timely_util::order::Partitioned;
47use rdkafka::consumer::base_consumer::PartitionQueue;
48use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
49use rdkafka::error::KafkaError;
50use rdkafka::message::{BorrowedMessage, Headers};
51use rdkafka::statistics::Statistics;
52use rdkafka::topic_partition_list::Offset;
53use rdkafka::{ClientContext, Message, TopicPartitionList};
54use serde::{Deserialize, Serialize};
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Pipeline;
58use timely::dataflow::operators::Capability;
59use timely::dataflow::operators::core::Partition;
60use timely::dataflow::operators::vec::Broadcast;
61use timely::dataflow::{Scope, StreamVec};
62use timely::progress::Antichain;
63use timely::progress::Timestamp;
64use tokio::sync::{Notify, mpsc};
65use tracing::{error, info, trace};
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
68use crate::metrics::source::kafka::KafkaSourceMetrics;
69use crate::source::types::{FuelSize, Probe, SignaledFuture, SourceRender, StackedCollection};
70use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
71use crate::statistics::SourceStatistics;
72
73#[derive(
74 Clone,
75 Debug,
76 Default,
77 PartialEq,
78 Eq,
79 PartialOrd,
80 Ord,
81 Serialize,
82 Deserialize
83)]
84struct HealthStatus {
85 kafka: Option<HealthStatusUpdate>,
86 ssh: Option<HealthStatusUpdate>,
87}
88
89impl HealthStatus {
90 fn kafka(update: HealthStatusUpdate) -> Self {
91 Self {
92 kafka: Some(update),
93 ssh: None,
94 }
95 }
96
97 fn ssh(update: HealthStatusUpdate) -> Self {
98 Self {
99 kafka: None,
100 ssh: Some(update),
101 }
102 }
103}
104
105pub struct KafkaSourceReader {
107 topic_name: String,
109 source_name: String,
111 id: GlobalId,
113 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
115 partition_consumers: Vec<PartitionConsumer>,
117 worker_id: usize,
119 worker_count: usize,
121 last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
125 start_offsets: BTreeMap<PartitionId, i64>,
127 stats_rx: crossbeam_channel::Receiver<Jsonb>,
129 partition_metrics: KafkaSourceMetrics,
131 partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
133}
134
135struct PartitionCapability {
136 data: Capability<KafkaTimestamp>,
138}
139
140type PartitionWatermark = u64;
145
146pub struct KafkaResumeUpperProcessor {
149 config: RawSourceCreationConfig,
150 topic_name: String,
151 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
152 statistics: Vec<SourceStatistics>,
153}
154
155fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
159 let pid = usize::try_from(pid).expect("positive pid");
160 ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
161}
162
163struct SourceOutputInfo {
164 id: GlobalId,
165 output_index: usize,
166 resume_upper: Antichain<KafkaTimestamp>,
167 metadata_columns: Vec<KafkaMetadataKind>,
168}
169
170impl SourceRender for KafkaSourceConnection {
171 type Time = KafkaTimestamp;
176
177 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
178
179 fn render<'scope>(
180 self,
181 scope: Scope<'scope, KafkaTimestamp>,
182 config: &RawSourceCreationConfig,
183 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
184 start_signal: impl std::future::Future<Output = ()> + 'static,
185 ) -> (
186 BTreeMap<
187 GlobalId,
188 StackedCollection<'scope, KafkaTimestamp, Result<SourceMessage, DataflowError>>,
189 >,
190 StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
191 StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
192 Vec<PressOnDropButton>,
193 ) {
194 let (metadata, probes, metadata_token) =
195 render_metadata_fetcher(scope, self.clone(), config.clone());
196 let (data, health, reader_token) = render_reader(
197 scope,
198 self,
199 config.clone(),
200 resume_uppers,
201 metadata,
202 start_signal,
203 );
204
205 let partition_count = u64::cast_from(config.source_exports.len());
206 let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
207 partition_count,
208 |((output, data), time, diff)| {
209 let output = u64::cast_from(output);
210 (output, (data, time, diff))
211 },
212 );
213 let mut data_collections = BTreeMap::new();
214 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
215 data_collections.insert(*id, data_stream.as_collection());
216 }
217
218 (
219 data_collections,
220 health,
221 probes,
222 vec![metadata_token, reader_token],
223 )
224 }
225}
226
227fn render_reader<'scope>(
232 scope: Scope<'scope, KafkaTimestamp>,
233 connection: KafkaSourceConnection,
234 config: RawSourceCreationConfig,
235 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
236 metadata_stream: StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
237 start_signal: impl std::future::Future<Output = ()> + 'static,
238) -> (
239 StackedCollection<'scope, KafkaTimestamp, (usize, Result<SourceMessage, DataflowError>)>,
240 StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
241 PressOnDropButton,
242) {
243 let name = format!("KafkaReader({})", config.id);
244 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
245
246 let (data_output, stream) = builder.new_output::<FueledBuilder<_>>();
247 let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
248
249 let mut metadata_input = builder.new_disconnected_input(metadata_stream.broadcast(), Pipeline);
250
251 let mut outputs = vec![];
252
253 let mut all_export_stats = vec![];
255 let mut snapshot_export_stats = vec![];
256 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
257 let SourceExport {
258 details,
259 storage_metadata: _,
260 data_config: _,
261 } = export;
262 let resume_upper = Antichain::from_iter(
263 config
264 .source_resume_uppers
265 .get(id)
266 .expect("all source exports must be present in source resume uppers")
267 .iter()
268 .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
269 );
270
271 let metadata_columns = match details {
272 SourceExportDetails::Kafka(details) => details
273 .metadata_columns
274 .iter()
275 .map(|(_name, kind)| kind.clone())
276 .collect::<Vec<_>>(),
277 _ => panic!("unexpected source export details: {:?}", details),
278 };
279
280 let statistics = config
281 .statistics
282 .get(id)
283 .expect("statistics have been initialized")
284 .clone();
285 if resume_upper.as_ref() == &[Partitioned::minimum()] {
287 snapshot_export_stats.push(statistics.clone());
288 }
289 all_export_stats.push(statistics);
290
291 let output = SourceOutputInfo {
292 id: *id,
293 resume_upper,
294 output_index: idx,
295 metadata_columns,
296 };
297 outputs.push(output);
298 }
299
300 let busy_signal = Arc::clone(&config.busy_signal);
301 let button = builder.build(move |caps| {
302 SignaledFuture::new(busy_signal, async move {
303 let [mut data_cap, health_cap] = caps.try_into().unwrap();
304
305 let client_id = connection.client_id(
306 config.config.config_set(),
307 &config.config.connection_context,
308 config.id,
309 );
310 let group_id = connection.group_id(&config.config.connection_context, config.id);
311 let KafkaSourceConnection {
312 connection,
313 topic,
314 topic_metadata_refresh_interval,
315 start_offsets,
316 metadata_columns: _,
317 connection_id: _, group_id_prefix: _, } = connection;
322
323 info!(
324 source_id = config.id.to_string(),
325 worker_id = config.worker_id,
326 num_workers = config.worker_count,
327 "instantiating Kafka source reader at offsets {start_offsets:?}"
328 );
329
330 let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
331 let notificator = Arc::new(Notify::new());
332
333 let consumer: Result<BaseConsumer<_>, _> = connection
334 .create_with_context(
335 &config.config,
336 GlueConsumerContext {
337 notificator: Arc::clone(¬ificator),
338 stats_tx,
339 inner: MzClientContext::default(),
340 },
341 &btreemap! {
342 "enable.auto.commit" => "false".into(),
347 "auto.offset.reset" => "earliest".into(),
350 "topic.metadata.refresh.interval.ms" =>
353 topic_metadata_refresh_interval
354 .as_millis()
355 .to_string(),
356 "fetch.message.max.bytes" => "134217728".into(),
358 "group.id" => group_id.clone(),
364 "client.id" => client_id.clone(),
367 },
368 InTask::Yes,
369 )
370 .await;
371
372 let consumer = match consumer {
373 Ok(consumer) => Arc::new(consumer),
374 Err(e) => {
375 let update = HealthStatusUpdate::halting(
376 format!(
377 "failed creating kafka reader consumer: {}",
378 e.display_with_causes()
379 ),
380 None,
381 );
382 health_output.give(
383 &health_cap,
384 HealthStatusMessage {
385 id: None,
386 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
387 StatusNamespace::Ssh
388 } else {
389 StatusNamespace::Kafka
390 },
391 update: update.clone(),
392 },
393 );
394 for (output, update) in outputs.iter().repeat_clone(update) {
395 health_output.give(
396 &health_cap,
397 HealthStatusMessage {
398 id: Some(output.id),
399 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
400 StatusNamespace::Ssh
401 } else {
402 StatusNamespace::Kafka
403 },
404 update,
405 },
406 );
407 }
408 std::future::pending::<()>().await;
412 unreachable!("pending future never returns");
413 }
414 };
415
416 let mut start_offsets: BTreeMap<_, u64> = start_offsets
418 .clone()
419 .into_iter()
420 .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
421 .map(|(pid, offset)| (pid, u64::try_from(offset).expect("start offsets must be non-negative and fit into u64")))
422 .collect();
423
424 let mut partition_capabilities = BTreeMap::new();
425 let mut max_pid = None;
426 let resume_upper = Antichain::from_iter(
427 outputs
428 .iter()
429 .map(|output| output.resume_upper.clone())
430 .flatten(),
431 );
432
433 tracing::info!(
434 source_id = config.id.to_string(),
435 worker_id = config.worker_id,
436 num_workers = config.worker_count,
437 "Kafka source reader starting rehydration with resume upper: {resume_upper:?} and start offsets: {start_offsets:?}"
438 );
439
440 let low_watermarks = fetch_partition_info(
441 &consumer,
442 topic.as_str(),
443 config
444 .config
445 .parameters
446 .kafka_timeout_config
447 .fetch_metadata_timeout,
448 Offset::Beginning, ).unwrap_or_else(|e| {
450 tracing::warn!(
451 source_id = config.id.to_string(),
452 worker_id = config.worker_id,
453 num_workers = config.worker_count,
454 "Failed to fetch watermarks for topic {topic}: {e}"
455 );
456 let update = HealthStatusUpdate::halting(
457 format!(
458 "Failed to fetch watermarks for topic {topic}: {e}"
459 ),
460 None,
461 );
462 health_output.give(
463 &health_cap,
464 HealthStatusMessage {
465 id: None,
466 namespace: StatusNamespace::Kafka,
467 update: update.clone(),
468 },
469 );
470 for (output, update) in outputs.iter().repeat_clone(update) {
471 health_output.give(
472 &health_cap,
473 HealthStatusMessage {
474 id: Some(output.id),
475 namespace: StatusNamespace::Kafka,
476 update,
477 },
478 );
479 }
480 if let GetPartitionsError::TopicDoesNotExist = e {
481 let error = Err(
483 SourceError{
484 error:SourceErrorDetails::Initialization(e.to_string().into())
485 }.into()
486 );
487 let time = data_cap.time().clone();
488 for (output, error) in
489 outputs.iter().map(|o| o.output_index).repeat_clone(error)
490 {
491 let update = ((output, error), time.clone(), Diff::ONE);
492 data_output
493 .give(&data_cap, update);
494 }
495 }
496 BTreeMap::new()
497 });
498
499 for ts in resume_upper.elements() {
500 if let Some(pid) = ts.interval().singleton() {
501 let pid = pid.unwrap_exact();
502 max_pid = std::cmp::max(max_pid, Some(*pid));
503
504 if responsible_for_pid(&config, *pid) {
505 let restored_offset = ts.timestamp().offset;
506 if let Some(start_offset) = start_offsets.get_mut(pid) {
507 *start_offset = std::cmp::max(restored_offset, *start_offset);
508 } else {
509 start_offsets.insert(*pid, restored_offset);
510 }
511
512 let part_ts = Partitioned::new_singleton(
513 RangeBound::exact(*pid),
514 ts.timestamp().clone(),
515 );
516 let part_cap = PartitionCapability {
517 data: data_cap.delayed(&part_ts),
518 };
519 partition_capabilities.insert(*pid, part_cap);
520 }
521 }
522 }
523 let lower = max_pid
524 .map(RangeBound::after)
525 .unwrap_or(RangeBound::NegInfinity);
526 let future_ts =
527 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
528 data_cap.downgrade(&future_ts);
529
530 for (pid, lwm) in &low_watermarks {
531 if responsible_for_pid(&config, *pid) {
532 if let Some(start_offset) = start_offsets.get_mut(pid) {
539 tracing::info!(
540 source_id = config.id.to_string(),
541 worker_id = config.worker_id,
542 num_workers = config.worker_count,
543 "restored offset {start_offset} for topic {topic} partition {pid} with low watermark {lwm}"
544 );
545 if lwm > start_offset {
546 tracing::error!(
547 source_id = config.id.to_string(),
548 worker_id = config.worker_id,
549 num_workers = config.worker_count,
550 "start offset and resume upper {start_offset} for topic {topic} \
551 partition {pid} is behind the low watermark {lwm}. This likely \
552 means that the offsets have been compacted away by Kafka."
553 );
554 let err_str = format!(
555 "Low watermark {lwm} of kafka topic {topic} partition {pid} \
556 is past the start offset/resume upper: {start_offset} \
557 This likely means that the offsets have been compacted away \
558 by Kafka. Please consider setting a higher start offset or \
559 adjusting your retention policies to prevent this.",
560 );
561
562 let update = HealthStatusUpdate::halting(
563 err_str.clone(),
564 None,
565 );
566 health_output.give(
567 &health_cap,
568 HealthStatusMessage {
569 id: None,
570 namespace: StatusNamespace::Kafka,
571 update: update.clone(),
572 },
573 );
574 let error = Err(
575 SourceError{
576 error:SourceErrorDetails::Initialization(err_str.into())
577 }.into()
578 );
579 let time = data_cap.time().clone();
580 for (output, error) in
581 outputs.iter().map(|o| o.output_index).repeat_clone(error)
582 {
583 let update = ((output, error), time.clone(), Diff::ONE);
584 let size = update.fuel_size();
585 data_output
586 .give_fueled(&data_cap, update, size)
587 .await;
588 }
589 return;
590 }
591 } else {
592 tracing::warn!(
593 source_id = config.id.to_string(),
594 worker_id = config.worker_id,
595 num_workers = config.worker_count,
596 "partition {pid} has a non-zero low watermark {lwm}, but no start offset or \
597 resume upper was found for this partition. Setting start offset to low watermark"
598 );
599 start_offsets.insert(*pid, *lwm);
600 }
601 }
602 }
603
604 start_signal.await;
608 info!(
609 source_id = config.id.to_string(),
610 worker_id = config.worker_id,
611 num_workers = config.worker_count,
612 "kafka worker noticed rehydration is finished, starting partition queues..."
613 );
614
615 let partition_ids = start_offsets.keys().copied().collect();
616 let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
617 let start_offsets = start_offsets.iter().map(|(pid, offset)| (*pid, i64::try_from(*offset).expect("start offsets must fit into i64"))).collect();
618
619 let mut reader = KafkaSourceReader {
620 topic_name: topic.clone(),
621 source_name: config.name.clone(),
622 id: config.id,
623 partition_consumers: Vec::new(),
624 consumer: Arc::clone(&consumer),
625 worker_id: config.worker_id,
626 worker_count: config.worker_count,
627 last_offsets: outputs
628 .iter()
629 .map(|output| (output.output_index, BTreeMap::new()))
630 .collect(),
631 start_offsets,
632 stats_rx,
633 partition_metrics: config.metrics.get_kafka_source_metrics(
634 partition_ids,
635 topic.clone(),
636 config.id,
637 ),
638 partition_capabilities,
639 };
640
641 let offset_committer = KafkaResumeUpperProcessor {
642 config: config.clone(),
643 topic_name: topic.clone(),
644 consumer,
645 statistics: all_export_stats.clone(),
646 };
647
648 if !snapshot_export_stats.is_empty() {
650 if let Err(e) = offset_committer
651 .process_frontier(resume_upper.clone())
652 .await
653 {
654 offset_commit_metrics.offset_commit_failures.inc();
655 tracing::warn!(
656 %e,
657 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
658 worker_id = config.worker_id,
659 source_id = config.id,
660 upper = resume_upper.pretty()
661 );
662 }
663 for statistics in config.statistics.values() {
667 statistics.set_snapshot_records_known(0);
668 statistics.set_snapshot_records_staged(0);
669 }
670 }
671
672 let resume_uppers_process_loop = async move {
673 tokio::pin!(resume_uppers);
674 while let Some(frontier) = resume_uppers.next().await {
675 if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
676 offset_commit_metrics.offset_commit_failures.inc();
677 tracing::warn!(
678 %e,
679 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
680 worker_id = config.worker_id,
681 source_id = config.id,
682 upper = frontier.pretty()
683 );
684 }
685 }
686 std::future::pending::<()>().await;
691 };
692 tokio::pin!(resume_uppers_process_loop);
693
694 let mut metadata_update: Option<MetadataUpdate> = None;
695 let mut snapshot_total = None;
696
697 let max_wait_time =
698 mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
699 loop {
700 tokio::select! {
703 _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
706
707 _ = metadata_input.ready() => {
708 let mut updates = Vec::new();
710 while let Some(event) = metadata_input.next_sync() {
711 if let Event::Data(_, mut data) = event {
712 updates.append(&mut data);
713 }
714 }
715 metadata_update = updates
716 .into_iter()
717 .max_by_key(|(ts, _)| *ts)
718 .map(|(_, update)| update);
719 }
720
721 _ = resume_uppers_process_loop.as_mut() => {},
725 }
726
727 match metadata_update.take() {
728 Some(MetadataUpdate::Partitions(partitions)) => {
729 let max_pid = partitions.keys().last().cloned();
730 let lower = max_pid
731 .map(RangeBound::after)
732 .unwrap_or(RangeBound::NegInfinity);
733 let future_ts = Partitioned::new_range(
734 lower,
735 RangeBound::PosInfinity,
736 MzOffset::from(0),
737 );
738
739 let mut offset_known = 0;
740 for (&pid, &high_watermark) in &partitions {
741 if responsible_for_pid(&config, pid) {
742 offset_known += high_watermark;
743 reader.ensure_partition(pid);
744 if let Entry::Vacant(entry) =
745 reader.partition_capabilities.entry(pid)
746 {
747 let start_offset = match reader.start_offsets.get(&pid) {
748 Some(&offset) => offset.try_into().unwrap(),
749 None => 0u64,
750 };
751 let part_since_ts = Partitioned::new_singleton(
752 RangeBound::exact(pid),
753 MzOffset::from(start_offset),
754 );
755
756 entry.insert(PartitionCapability {
757 data: data_cap.delayed(&part_since_ts),
758 });
759 }
760 }
761 }
762
763 if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
766 snapshot_total = Some(offset_known);
770 }
771
772 for output in &outputs {
776 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
777 health_output.give(
778 &health_cap,
779 HealthStatusMessage {
780 id: Some(output.id),
781 namespace,
782 update: HealthStatusUpdate::running(),
783 },
784 );
785 }
786 }
787 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
788 health_output.give(
789 &health_cap,
790 HealthStatusMessage {
791 id: None,
792 namespace,
793 update: HealthStatusUpdate::running(),
794 },
795 );
796 }
797
798 for export_stat in all_export_stats.iter() {
799 export_stat.set_offset_known(offset_known);
800 }
801
802 data_cap.downgrade(&future_ts);
803 }
804 Some(MetadataUpdate::TransientError(status)) => {
805 if let Some(update) = status.kafka {
806 health_output.give(
807 &health_cap,
808 HealthStatusMessage {
809 id: None,
810 namespace: StatusNamespace::Kafka,
811 update: update.clone(),
812 },
813 );
814 for (output, update) in outputs.iter().repeat_clone(update) {
815 health_output.give(
816 &health_cap,
817 HealthStatusMessage {
818 id: Some(output.id),
819 namespace: StatusNamespace::Kafka,
820 update,
821 },
822 );
823 }
824 }
825 if let Some(update) = status.ssh {
826 health_output.give(
827 &health_cap,
828 HealthStatusMessage {
829 id: None,
830 namespace: StatusNamespace::Ssh,
831 update: update.clone(),
832 },
833 );
834 for (output, update) in outputs.iter().repeat_clone(update) {
835 health_output.give(
836 &health_cap,
837 HealthStatusMessage {
838 id: Some(output.id),
839 namespace: StatusNamespace::Ssh,
840 update,
841 },
842 );
843 }
844 }
845 }
846 Some(MetadataUpdate::DefiniteError(error)) => {
847 health_output.give(
848 &health_cap,
849 HealthStatusMessage {
850 id: None,
851 namespace: StatusNamespace::Kafka,
852 update: HealthStatusUpdate::stalled(
853 error.to_string(),
854 None,
855 ),
856 },
857 );
858 let error = Err(error.into());
859 let time = data_cap.time().clone();
860 for (output, error) in
861 outputs.iter().map(|o| o.output_index).repeat_clone(error)
862 {
863 let update = ((output, error), time, Diff::ONE);
864 let size = update.fuel_size();
865 data_output
866 .give_fueled(&data_cap, update, size)
867 .await;
868 }
869
870 return;
871 }
872 None => {}
873 }
874
875 while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
883 match result {
884 Err(e) => {
885 let error = format!(
886 "kafka error when polling consumer for source: {} topic: {} : {}",
887 reader.source_name, reader.topic_name, e
888 );
889 let status = HealthStatusUpdate::stalled(error, None);
890 health_output.give(
891 &health_cap,
892 HealthStatusMessage {
893 id: None,
894 namespace: StatusNamespace::Kafka,
895 update: status.clone(),
896 },
897 );
898 for (output, status) in outputs.iter().repeat_clone(status) {
899 health_output.give(
900 &health_cap,
901 HealthStatusMessage {
902 id: Some(output.id),
903 namespace: StatusNamespace::Kafka,
904 update: status,
905 },
906 );
907 }
908 }
909 Ok(message) => {
910 let output_messages = outputs
911 .iter()
912 .map(|output| {
913 let (message, ts) = construct_source_message(
914 &message,
915 &output.metadata_columns,
916 );
917 (output.output_index, message, ts)
918 })
919 .collect::<Vec<_>>();
923 for (output_index, message, ts) in output_messages {
924 if let Some((msg, time, diff)) =
925 reader.handle_message(message, ts, &output_index)
926 {
927 let pid = time.interval().singleton().unwrap().unwrap_exact();
928 let part_cap = &reader.partition_capabilities[pid].data;
929 let msg = msg.map_err(|e| {
930 DataflowError::SourceError(Box::new(SourceError {
931 error: SourceErrorDetails::Other(e.to_string().into()),
932 }))
933 });
934 let update = ((output_index, msg), time, diff);
935 let size = update.fuel_size();
936 data_output
937 .give_fueled(part_cap, update, size)
938 .await;
939 }
940 }
941 }
942 }
943 }
944
945 reader.update_stats();
946
947 let mut consumers = std::mem::take(&mut reader.partition_consumers);
949 for consumer in consumers.iter_mut() {
950 let pid = consumer.pid();
951 let mut partition_exhausted = false;
957 for _ in 0..10_000 {
958 let Some(message) = consumer.get_next_message().transpose() else {
959 partition_exhausted = true;
960 break;
961 };
962
963 for output in outputs.iter() {
964 let message = match &message {
965 Ok((msg, pid)) => {
966 let (msg, ts) =
967 construct_source_message(msg, &output.metadata_columns);
968 assert_eq!(*pid, ts.0);
969 Ok(reader.handle_message(msg, ts, &output.output_index))
970 }
971 Err(err) => Err(err),
972 };
973 match message {
974 Ok(Some((msg, time, diff))) => {
975 let pid = time.interval().singleton().unwrap().unwrap_exact();
976 let part_cap = &reader.partition_capabilities[pid].data;
977 let msg = msg.map_err(|e| {
978 DataflowError::SourceError(Box::new(SourceError {
979 error: SourceErrorDetails::Other(e.to_string().into()),
980 }))
981 });
982 let update =
983 ((output.output_index, msg), time, diff);
984 let size = update.fuel_size();
985 data_output
986 .give_fueled(part_cap, update, size)
987 .await;
988 }
989 Ok(None) => continue,
991 Err(err) => {
992 let last_offset = reader
993 .last_offsets
994 .get(&output.output_index)
995 .expect("output known to be installed")
996 .get(&pid)
997 .expect("partition known to be installed");
998
999 let status = HealthStatusUpdate::stalled(
1000 format!(
1001 "error consuming from source: {} topic: {topic}:\
1002 partition: {pid} last processed offset:\
1003 {last_offset} : {err}",
1004 config.name
1005 ),
1006 None,
1007 );
1008 health_output.give(
1009 &health_cap,
1010 HealthStatusMessage {
1011 id: None,
1012 namespace: StatusNamespace::Kafka,
1013 update: status.clone(),
1014 },
1015 );
1016 health_output.give(
1017 &health_cap,
1018 HealthStatusMessage {
1019 id: Some(output.id),
1020 namespace: StatusNamespace::Kafka,
1021 update: status,
1022 },
1023 );
1024 }
1025 }
1026 }
1027 }
1028 if !partition_exhausted {
1029 notificator.notify_one();
1030 }
1031 }
1032 assert!(reader.partition_consumers.is_empty());
1034 reader.partition_consumers = consumers;
1035
1036 let positions = reader.consumer.position().unwrap();
1037 let topic_positions = positions.elements_for_topic(&reader.topic_name);
1038 let mut snapshot_staged = 0;
1039
1040 for position in topic_positions {
1041 if let Offset::Offset(offset) = position.offset() {
1044 let pid = position.partition();
1045 let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
1046 let upper =
1047 Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
1048
1049 let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
1050 match part_cap.data.try_downgrade(&upper) {
1051 Ok(()) => {
1052 if !snapshot_export_stats.is_empty() {
1053 snapshot_staged += offset.try_into().unwrap_or(0u64);
1056 if let Some(snapshot_total) = snapshot_total {
1058 snapshot_staged =
1061 std::cmp::min(snapshot_staged, snapshot_total);
1062 }
1063 }
1064 }
1065 Err(_) => {
1066 info!(
1069 source_id = config.id.to_string(),
1070 worker_id = config.worker_id,
1071 num_workers = config.worker_count,
1072 "kafka source frontier downgrade skipped due to already \
1073 seen offset: {:?}",
1074 upper
1075 );
1076 }
1077 };
1078
1079 }
1080 }
1081
1082 if let (Some(snapshot_total), true) =
1083 (snapshot_total, !snapshot_export_stats.is_empty())
1084 {
1085 for export_stat in snapshot_export_stats.iter() {
1086 export_stat.set_snapshot_records_known(snapshot_total);
1087 export_stat.set_snapshot_records_staged(snapshot_staged);
1088 }
1089 if snapshot_total == snapshot_staged {
1090 snapshot_export_stats.clear();
1091 }
1092 }
1093 }
1094 })
1095 });
1096
1097 (
1098 stream.as_collection(),
1099 health_stream,
1100 button.press_on_drop(),
1101 )
1102}
1103
1104impl KafkaResumeUpperProcessor {
1105 async fn process_frontier(
1106 &self,
1107 frontier: Antichain<KafkaTimestamp>,
1108 ) -> Result<(), anyhow::Error> {
1109 use rdkafka::consumer::CommitMode;
1110
1111 let mut offsets = vec![];
1113 let mut offset_committed = 0;
1114 for ts in frontier.iter() {
1115 if let Some(pid) = ts.interval().singleton() {
1116 let pid = pid.unwrap_exact();
1117 if responsible_for_pid(&self.config, *pid) {
1118 offsets.push((pid.clone(), *ts.timestamp()));
1119
1120 offset_committed += ts.timestamp().offset;
1125 }
1126 }
1127 }
1128
1129 for export_stat in self.statistics.iter() {
1130 export_stat.set_offset_committed(offset_committed);
1131 }
1132
1133 if !offsets.is_empty() {
1134 let mut tpl = TopicPartitionList::new();
1135 for (pid, offset) in offsets {
1136 let offset_to_commit =
1137 Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
1138 tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
1139 .expect("offset known to be valid");
1140 }
1141 let consumer = Arc::clone(&self.consumer);
1142 mz_ore::task::spawn_blocking(
1143 || format!("source({}) kafka offset commit", self.config.id),
1144 move || consumer.commit(&tpl, CommitMode::Sync),
1145 )
1146 .await?;
1147 }
1148 Ok(())
1149 }
1150}
1151
1152impl KafkaSourceReader {
1153 fn ensure_partition(&mut self, pid: PartitionId) {
1155 if self.last_offsets.is_empty() {
1156 tracing::info!(
1157 source_id = %self.id,
1158 worker_id = %self.worker_id,
1159 "kafka source does not have any outputs, not creating partition queue");
1160
1161 return;
1162 }
1163 for last_offsets in self.last_offsets.values() {
1164 if last_offsets.contains_key(&pid) {
1166 return;
1167 }
1168 }
1169
1170 let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1171 self.create_partition_queue(pid, Offset::Offset(start_offset));
1172
1173 for last_offsets in self.last_offsets.values_mut() {
1174 let prev = last_offsets.insert(pid, start_offset - 1);
1175 assert_none!(prev);
1176 }
1177 }
1178
1179 fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1181 info!(
1182 source_id = self.id.to_string(),
1183 worker_id = self.worker_id,
1184 num_workers = self.worker_count,
1185 "activating Kafka queue for topic {}, partition {}",
1186 self.topic_name,
1187 partition_id,
1188 );
1189
1190 let tpl = self.consumer.assignment().unwrap();
1192 let mut partition_list = TopicPartitionList::new();
1194 for partition in tpl.elements_for_topic(&self.topic_name) {
1195 partition_list
1196 .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1197 .expect("offset known to be valid");
1198 }
1199 partition_list
1201 .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1202 .expect("offset known to be valid");
1203 self.consumer
1204 .assign(&partition_list)
1205 .expect("assignment known to be valid");
1206
1207 let context = Arc::clone(self.consumer.context());
1210 for pc in &mut self.partition_consumers {
1211 pc.partition_queue = self
1212 .consumer
1213 .split_partition_queue(&self.topic_name, pc.pid)
1214 .expect("partition known to be valid");
1215 pc.partition_queue.set_nonempty_callback({
1216 let context = Arc::clone(&context);
1217 move || context.inner().activate()
1218 });
1219 }
1220
1221 let mut partition_queue = self
1222 .consumer
1223 .split_partition_queue(&self.topic_name, partition_id)
1224 .expect("partition known to be valid");
1225 partition_queue.set_nonempty_callback(move || context.inner().activate());
1226 self.partition_consumers
1227 .push(PartitionConsumer::new(partition_id, partition_queue));
1228 assert_eq!(
1229 self.consumer
1230 .assignment()
1231 .unwrap()
1232 .elements_for_topic(&self.topic_name)
1233 .len(),
1234 self.partition_consumers.len()
1235 );
1236 }
1237
1238 fn update_stats(&mut self) {
1240 while let Ok(stats) = self.stats_rx.try_recv() {
1241 match serde_json::from_str::<Statistics>(&stats.to_string()) {
1242 Ok(statistics) => {
1243 let topic = statistics.topics.get(&self.topic_name);
1244 match topic {
1245 Some(topic) => {
1246 for (id, partition) in &topic.partitions {
1247 self.partition_metrics
1248 .set_offset_max(*id, partition.hi_offset);
1249 }
1250 }
1251 None => error!("No stats found for topic: {}", &self.topic_name),
1252 }
1253 }
1254 Err(e) => {
1255 error!("failed decoding librdkafka statistics JSON: {}", e);
1256 }
1257 }
1258 }
1259 }
1260
1261 fn handle_message(
1264 &mut self,
1265 message: Result<SourceMessage, KafkaHeaderParseError>,
1266 (partition, offset): (PartitionId, MzOffset),
1267 output_index: &usize,
1268 ) -> Option<(
1269 Result<SourceMessage, KafkaHeaderParseError>,
1270 KafkaTimestamp,
1271 Diff,
1272 )> {
1273 assert!(
1285 self.last_offsets
1286 .get(output_index)
1287 .unwrap()
1288 .contains_key(&partition)
1289 );
1290
1291 let last_offset_ref = self
1292 .last_offsets
1293 .get_mut(output_index)
1294 .expect("output known to be installed")
1295 .get_mut(&partition)
1296 .expect("partition known to be installed");
1297
1298 let last_offset = *last_offset_ref;
1299 let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1300 if offset_as_i64 <= last_offset {
1301 info!(
1302 source_id = self.id.to_string(),
1303 worker_id = self.worker_id,
1304 num_workers = self.worker_count,
1305 "kafka message before expected offset: \
1306 source {} (reading topic {}, partition {}, output {}) \
1307 received offset {} expected offset {:?}",
1308 self.source_name,
1309 self.topic_name,
1310 partition,
1311 output_index,
1312 offset.offset,
1313 last_offset + 1,
1314 );
1315 None
1317 } else {
1318 *last_offset_ref = offset_as_i64;
1319
1320 let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1321 Some((message, ts, Diff::ONE))
1322 }
1323 }
1324}
1325
1326fn construct_source_message(
1327 msg: &BorrowedMessage<'_>,
1328 metadata_columns: &[KafkaMetadataKind],
1329) -> (
1330 Result<SourceMessage, KafkaHeaderParseError>,
1331 (PartitionId, MzOffset),
1332) {
1333 let pid = msg.partition();
1334 let Ok(offset) = u64::try_from(msg.offset()) else {
1335 panic!(
1336 "got negative offset ({}) from otherwise non-error'd kafka message",
1337 msg.offset()
1338 );
1339 };
1340
1341 let mut metadata = Row::default();
1342 let mut packer = metadata.packer();
1343 for kind in metadata_columns {
1344 match kind {
1345 KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1346 KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1347 KafkaMetadataKind::Timestamp => {
1348 let ts = msg
1349 .timestamp()
1350 .to_millis()
1351 .expect("kafka sources always have upstream_time");
1352
1353 let d: Datum = DateTime::from_timestamp_millis(ts)
1354 .and_then(|dt| {
1355 let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1356 dt.naive_utc().try_into().ok();
1357 ct
1358 })
1359 .into();
1360 packer.push(d)
1361 }
1362 KafkaMetadataKind::Header { key, use_bytes } => {
1363 match msg.headers() {
1364 Some(headers) => {
1365 let d = headers
1366 .iter()
1367 .filter(|header| header.key == key)
1368 .last()
1369 .map(|header| match header.value {
1370 Some(v) => {
1371 if *use_bytes {
1372 Ok(Datum::Bytes(v))
1373 } else {
1374 match str::from_utf8(v) {
1375 Ok(str) => Ok(Datum::String(str)),
1376 Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1377 key: key.clone(),
1378 raw: v.to_vec(),
1379 }),
1380 }
1381 }
1382 }
1383 None => Ok(Datum::Null),
1384 })
1385 .unwrap_or_else(|| {
1386 Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1387 });
1388 match d {
1389 Ok(d) => packer.push(d),
1390 Err(err) => return (Err(err), (pid, offset.into())),
1392 }
1393 }
1394 None => packer.push(Datum::Null),
1395 }
1396 }
1397 KafkaMetadataKind::Headers => {
1398 packer.push_list_with(|r| {
1399 if let Some(headers) = msg.headers() {
1400 for header in headers.iter() {
1401 match header.value {
1402 Some(v) => r.push_list_with(|record_row| {
1403 record_row.push(Datum::String(header.key));
1404 record_row.push(Datum::Bytes(v));
1405 }),
1406 None => r.push_list_with(|record_row| {
1407 record_row.push(Datum::String(header.key));
1408 record_row.push(Datum::Null);
1409 }),
1410 }
1411 }
1412 }
1413 });
1414 }
1415 }
1416 }
1417
1418 let key = match msg.key() {
1419 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1420 None => Row::pack([Datum::Null]),
1421 };
1422 let value = match msg.payload() {
1423 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1424 None => Row::pack([Datum::Null]),
1425 };
1426 (
1427 Ok(SourceMessage {
1428 key,
1429 value,
1430 metadata,
1431 }),
1432 (pid, offset.into()),
1433 )
1434}
1435
1436struct PartitionConsumer {
1438 pid: PartitionId,
1440 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1442}
1443
1444impl PartitionConsumer {
1445 fn new(
1447 pid: PartitionId,
1448 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1449 ) -> Self {
1450 PartitionConsumer {
1451 pid,
1452 partition_queue,
1453 }
1454 }
1455
1456 fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1463 match self.partition_queue.poll(Duration::from_millis(0)) {
1464 Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1465 Some(Err(err)) => Err(err),
1466 _ => Ok(None),
1467 }
1468 }
1469
1470 fn pid(&self) -> PartitionId {
1472 self.pid
1473 }
1474}
1475
1476struct GlueConsumerContext {
1479 notificator: Arc<Notify>,
1480 stats_tx: crossbeam_channel::Sender<Jsonb>,
1481 inner: MzClientContext,
1482}
1483
1484impl ClientContext for GlueConsumerContext {
1485 fn stats_raw(&self, statistics: &[u8]) {
1486 match Jsonb::from_slice(statistics) {
1487 Ok(statistics) => {
1488 self.stats_tx
1489 .send(statistics)
1490 .expect("timely operator hung up while Kafka source active");
1491 self.activate();
1492 }
1493 Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1494 };
1495 }
1496
1497 fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1500 self.inner.log(level, fac, log_message)
1501 }
1502 fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1503 self.inner.error(error, reason)
1504 }
1505}
1506
1507impl GlueConsumerContext {
1508 fn activate(&self) {
1509 self.notificator.notify_one();
1510 }
1511}
1512
1513impl ConsumerContext for GlueConsumerContext {}
1514
1515#[cfg(test)]
1516mod tests {
1517 use std::sync::Arc;
1518 use std::time::Duration;
1519
1520 use mz_kafka_util::client::create_new_client_config_simple;
1521 use rdkafka::consumer::{BaseConsumer, Consumer};
1522 use rdkafka::{Message, Offset, TopicPartitionList};
1523 use uuid::Uuid;
1524
1525 #[mz_ore::test]
1537 #[ignore]
1538 fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1539 let topic_name = "queue-test";
1540 let pid = 0;
1541
1542 let mut kafka_config = create_new_client_config_simple();
1543 kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1544 kafka_config.set("enable.auto.commit", "false");
1545 kafka_config.set("group.id", Uuid::new_v4().to_string());
1546 kafka_config.set("fetch.message.max.bytes", "100");
1547 let consumer: BaseConsumer<_> = kafka_config.create()?;
1548
1549 let consumer = Arc::new(consumer);
1550
1551 let mut partition_list = TopicPartitionList::new();
1552 partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1555
1556 consumer.assign(&partition_list)?;
1557
1558 let partition_queue = consumer
1559 .split_partition_queue(topic_name, pid)
1560 .expect("missing partition queue");
1561
1562 let expected_messages = 1_000;
1563
1564 let mut common_queue_count = 0;
1565 let mut partition_queue_count = 0;
1566
1567 loop {
1568 if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1569 match msg {
1570 Ok(msg) => {
1571 let _payload =
1572 std::str::from_utf8(msg.payload().expect("missing payload"))?;
1573 if partition_queue_count > 0 {
1574 anyhow::bail!(
1575 "Got message from common queue after we internally switched to partition queue."
1576 );
1577 }
1578
1579 common_queue_count += 1;
1580 }
1581 Err(err) => anyhow::bail!("{}", err),
1582 }
1583 }
1584
1585 match partition_queue.poll(Duration::from_millis(0)) {
1586 Some(Ok(msg)) => {
1587 let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1588 partition_queue_count += 1;
1589 }
1590 Some(Err(err)) => anyhow::bail!("{}", err),
1591 _ => (),
1592 }
1593
1594 if (common_queue_count + partition_queue_count) == expected_messages {
1595 break;
1596 }
1597 }
1598
1599 assert!(
1600 common_queue_count == 0,
1601 "Got {} out of {} messages from common queue. Partition queue: {}",
1602 common_queue_count,
1603 expected_messages,
1604 partition_queue_count
1605 );
1606
1607 Ok(())
1608 }
1609}
1610
1611fn fetch_partition_info<C: ConsumerContext>(
1613 consumer: &BaseConsumer<C>,
1614 topic: &str,
1615 fetch_timeout: Duration,
1616 offset_requested: Offset,
1617) -> Result<BTreeMap<PartitionId, PartitionWatermark>, GetPartitionsError> {
1618 let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1619
1620 let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1621 for pid in pids {
1622 offset_requests.add_partition_offset(topic, pid, offset_requested)?;
1623 }
1624
1625 let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1626
1627 let mut result = BTreeMap::new();
1628 for entry in offset_responses.elements() {
1629 let offset = match entry.offset() {
1630 Offset::Offset(offset) => offset,
1631 offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1632 };
1633
1634 let pid = entry.partition();
1635 let watermark = offset.try_into().expect("invalid negative offset");
1636 result.insert(pid, watermark);
1637 }
1638
1639 Ok(result)
1640}
1641
1642#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1644enum MetadataUpdate {
1645 Partitions(BTreeMap<PartitionId, PartitionWatermark>),
1647 TransientError(HealthStatus),
1651 DefiniteError(SourceError),
1655}
1656
1657impl MetadataUpdate {
1658 fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1660 match self {
1661 Self::Partitions(partitions) => {
1662 let max_pid = partitions.keys().last().copied();
1663 let lower = max_pid
1664 .map(RangeBound::after)
1665 .unwrap_or(RangeBound::NegInfinity);
1666 let future_ts =
1667 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1668
1669 let mut frontier = Antichain::from_elem(future_ts);
1670 for (pid, high_watermark) in partitions {
1671 frontier.insert(Partitioned::new_singleton(
1672 RangeBound::exact(*pid),
1673 MzOffset::from(*high_watermark),
1674 ));
1675 }
1676
1677 Some(frontier)
1678 }
1679 Self::DefiniteError(_) => Some(Antichain::new()),
1680 Self::TransientError(_) => None,
1681 }
1682 }
1683}
1684
1685#[derive(Debug, thiserror::Error)]
1686pub enum KafkaHeaderParseError {
1687 #[error("A header with key '{key}' was not found in the message headers")]
1688 KeyNotFound { key: String },
1689 #[error(
1690 "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1691 )]
1692 Utf8Error { key: String, raw: Vec<u8> },
1693}
1694
1695fn render_metadata_fetcher<'scope>(
1701 scope: Scope<'scope, KafkaTimestamp>,
1702 connection: KafkaSourceConnection,
1703 config: RawSourceCreationConfig,
1704) -> (
1705 StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
1706 StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
1707 PressOnDropButton,
1708) {
1709 let active_worker_id = usize::cast_from(config.id.hashed());
1710 let is_active_worker = active_worker_id % scope.peers() == scope.index();
1711
1712 let resume_upper = Antichain::from_iter(
1713 config
1714 .source_resume_uppers
1715 .values()
1716 .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1717 .flatten(),
1718 );
1719
1720 let name = format!("KafkaMetadataFetcher({})", config.id);
1721 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1722
1723 let (metadata_output, metadata_stream) =
1724 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1725 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1726
1727 let button = builder.build(move |caps| async move {
1728 if !is_active_worker {
1729 return;
1730 }
1731
1732 let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1733
1734 let client_id = connection.client_id(
1735 config.config.config_set(),
1736 &config.config.connection_context,
1737 config.id,
1738 );
1739 let KafkaSourceConnection {
1740 connection,
1741 topic,
1742 topic_metadata_refresh_interval,
1743 ..
1744 } = connection;
1745
1746 let consumer: Result<BaseConsumer<_>, _> = connection
1747 .create_with_context(
1748 &config.config,
1749 MzClientContext::default(),
1750 &btreemap! {
1751 "topic.metadata.refresh.interval.ms" =>
1754 topic_metadata_refresh_interval
1755 .as_millis()
1756 .to_string(),
1757 "client.id" => format!("{client_id}-metadata"),
1760 },
1761 InTask::Yes,
1762 )
1763 .await;
1764
1765 let consumer = match consumer {
1766 Ok(consumer) => consumer,
1767 Err(e) => {
1768 let msg = format!(
1769 "failed creating kafka metadata consumer: {}",
1770 e.display_with_causes()
1771 );
1772 let status_update = HealthStatusUpdate::halting(msg, None);
1773 let status = match e {
1774 ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1775 _ => HealthStatus::kafka(status_update),
1776 };
1777 let error = MetadataUpdate::TransientError(status);
1778 let timestamp = (config.now_fn)().into();
1779 metadata_output.give(&metadata_cap, (timestamp, error));
1780
1781 std::future::pending::<()>().await;
1785 unreachable!("pending future never returns");
1786 }
1787 };
1788
1789 let (tx, mut rx) = mpsc::unbounded_channel();
1790 spawn_metadata_thread(config, consumer, topic, tx);
1791
1792 let mut prev_upstream_frontier = resume_upper;
1793
1794 while let Some((timestamp, mut update)) = rx.recv().await {
1795 if prev_upstream_frontier.is_empty() {
1796 return;
1797 }
1798
1799 if let Some(upstream_frontier) = update.upstream_frontier() {
1800 if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1810 let error = SourceError {
1811 error: SourceErrorDetails::Other("topic was recreated".into()),
1812 };
1813 update = MetadataUpdate::DefiniteError(error);
1814 }
1815 }
1816
1817 if let Some(upstream_frontier) = update.upstream_frontier() {
1818 prev_upstream_frontier = upstream_frontier.clone();
1819
1820 let probe = Probe {
1821 probe_ts: timestamp,
1822 upstream_frontier,
1823 };
1824 probe_output.give(&probe_cap, probe);
1825 }
1826
1827 metadata_output.give(&metadata_cap, (timestamp, update));
1828 }
1829 });
1830
1831 (metadata_stream, probe_stream, button.press_on_drop())
1832}
1833
1834fn spawn_metadata_thread<C: ConsumerContext>(
1835 config: RawSourceCreationConfig,
1836 consumer: BaseConsumer<TunnelingClientContext<C>>,
1837 topic: String,
1838 tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1839) {
1840 thread::Builder::new()
1842 .name(format!("kfk-mtdt-{}", config.id))
1843 .spawn(move || {
1844 trace!(
1845 source_id = config.id.to_string(),
1846 worker_id = config.worker_id,
1847 num_workers = config.worker_count,
1848 "kafka metadata thread: starting..."
1849 );
1850
1851 let timestamp_interval = config.timestamp_interval;
1852 let mut ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
1853
1854 loop {
1855 let probe_ts = ticker.tick_blocking();
1856 let result = fetch_partition_info(
1857 &consumer,
1858 &topic,
1859 config
1860 .config
1861 .parameters
1862 .kafka_timeout_config
1863 .fetch_metadata_timeout,
1864 Offset::End,
1865 );
1866 trace!(
1867 source_id = config.id.to_string(),
1868 worker_id = config.worker_id,
1869 num_workers = config.worker_count,
1870 "kafka metadata thread: metadata fetch result: {:?}",
1871 result
1872 );
1873 let update = match result {
1874 Ok(partitions) => {
1875 trace!(
1876 source_id = config.id.to_string(),
1877 worker_id = config.worker_id,
1878 num_workers = config.worker_count,
1879 "kafka metadata thread: fetched partition metadata info",
1880 );
1881
1882 MetadataUpdate::Partitions(partitions)
1883 }
1884 Err(GetPartitionsError::TopicDoesNotExist) => {
1885 let error = SourceError {
1886 error: SourceErrorDetails::Other("topic was deleted".into()),
1887 };
1888 MetadataUpdate::DefiniteError(error)
1889 }
1890 Err(e) => {
1891 let kafka_status = Some(HealthStatusUpdate::stalled(
1892 format!("{}", e.display_with_causes()),
1893 None,
1894 ));
1895
1896 let ssh_status = consumer.client().context().tunnel_status();
1897 let ssh_status = match ssh_status {
1898 SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1899 SshTunnelStatus::Errored(e) => {
1900 Some(HealthStatusUpdate::stalled(e, None))
1901 }
1902 };
1903
1904 MetadataUpdate::TransientError(HealthStatus {
1905 kafka: kafka_status,
1906 ssh: ssh_status,
1907 })
1908 }
1909 };
1910
1911 if tx.send((probe_ts, update)).is_err() {
1912 break;
1913 }
1914 }
1915
1916 info!(
1917 source_id = config.id.to_string(),
1918 worker_id = config.worker_id,
1919 num_workers = config.worker_count,
1920 "kafka metadata thread: receiver has gone away; shutting down."
1921 )
1922 })
1923 .unwrap();
1924}