1use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::str::{self};
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use chrono::{DateTime, NaiveDateTime};
19use differential_dataflow::{AsCollection, Hashable};
20use futures::StreamExt;
21use itertools::Itertools;
22use maplit::btreemap;
23use mz_kafka_util::client::{
24 GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
25};
26use mz_ore::assert_none;
27use mz_ore::cast::CastFrom;
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::iter::IteratorExt;
31use mz_repr::adt::timestamp::CheckedTimestamp;
32use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
33use mz_ssh_util::tunnel::SshTunnelStatus;
34use mz_storage_types::errors::{
35 ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
36};
37use mz_storage_types::sources::kafka::{
38 KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
39};
40use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
41use mz_timely_util::antichain::AntichainExt;
42use mz_timely_util::builder_async::{
43 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::containers::stack::FueledBuilder;
46use mz_timely_util::order::Partitioned;
47use rdkafka::consumer::base_consumer::PartitionQueue;
48use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
49use rdkafka::error::KafkaError;
50use rdkafka::message::{BorrowedMessage, Headers};
51use rdkafka::statistics::Statistics;
52use rdkafka::topic_partition_list::Offset;
53use rdkafka::{ClientContext, Message, TopicPartitionList};
54use serde::{Deserialize, Serialize};
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Pipeline;
58use timely::dataflow::operators::Capability;
59use timely::dataflow::operators::core::Partition;
60use timely::dataflow::operators::vec::Broadcast;
61use timely::dataflow::{Scope, StreamVec};
62use timely::progress::Antichain;
63use timely::progress::Timestamp;
64use tokio::sync::{Notify, mpsc};
65use tracing::{error, info, trace};
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
68use crate::metrics::source::kafka::KafkaSourceMetrics;
69use crate::source::types::{FuelSize, Probe, SignaledFuture, SourceRender, StackedCollection};
70use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
71use crate::statistics::SourceStatistics;
72
73#[derive(
74 Clone,
75 Debug,
76 Default,
77 PartialEq,
78 Eq,
79 PartialOrd,
80 Ord,
81 Serialize,
82 Deserialize
83)]
84struct HealthStatus {
85 kafka: Option<HealthStatusUpdate>,
86 ssh: Option<HealthStatusUpdate>,
87}
88
89impl HealthStatus {
90 fn kafka(update: HealthStatusUpdate) -> Self {
91 Self {
92 kafka: Some(update),
93 ssh: None,
94 }
95 }
96
97 fn ssh(update: HealthStatusUpdate) -> Self {
98 Self {
99 kafka: None,
100 ssh: Some(update),
101 }
102 }
103}
104
105pub struct KafkaSourceReader {
107 topic_name: String,
109 source_name: String,
111 id: GlobalId,
113 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
115 partition_consumers: Vec<PartitionConsumer>,
117 worker_id: usize,
119 worker_count: usize,
121 last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
125 start_offsets: BTreeMap<PartitionId, i64>,
127 stats_rx: crossbeam_channel::Receiver<Jsonb>,
129 partition_metrics: KafkaSourceMetrics,
131 partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
133}
134
135struct PartitionCapability {
136 data: Capability<KafkaTimestamp>,
138}
139
140type PartitionWatermark = u64;
145
146pub struct KafkaResumeUpperProcessor {
149 config: RawSourceCreationConfig,
150 topic_name: String,
151 consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
152 statistics: Vec<SourceStatistics>,
153}
154
155fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
159 let pid = usize::try_from(pid).expect("positive pid");
160 ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
161}
162
163struct SourceOutputInfo {
164 id: GlobalId,
165 output_index: usize,
166 resume_upper: Antichain<KafkaTimestamp>,
167 metadata_columns: Vec<KafkaMetadataKind>,
168}
169
170impl SourceRender for KafkaSourceConnection {
171 type Time = KafkaTimestamp;
176
177 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
178
179 fn render<'scope>(
180 self,
181 scope: Scope<'scope, KafkaTimestamp>,
182 config: &RawSourceCreationConfig,
183 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
184 start_signal: impl std::future::Future<Output = ()> + 'static,
185 ) -> (
186 BTreeMap<
187 GlobalId,
188 StackedCollection<'scope, KafkaTimestamp, Result<SourceMessage, DataflowError>>,
189 >,
190 StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
191 StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
192 Vec<PressOnDropButton>,
193 ) {
194 let (metadata, probes, metadata_token) =
195 render_metadata_fetcher(scope, self.clone(), config.clone());
196 let (data, health, reader_token) = render_reader(
197 scope,
198 self,
199 config.clone(),
200 resume_uppers,
201 metadata,
202 start_signal,
203 );
204
205 let partition_count = u64::cast_from(config.source_exports.len());
206 let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
207 partition_count,
208 |((output, data), time, diff)| {
209 let output = u64::cast_from(output);
210 (output, (data, time, diff))
211 },
212 );
213 let mut data_collections = BTreeMap::new();
214 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
215 data_collections.insert(*id, data_stream.as_collection());
216 }
217
218 (
219 data_collections,
220 health,
221 probes,
222 vec![metadata_token, reader_token],
223 )
224 }
225}
226
227fn render_reader<'scope>(
232 scope: Scope<'scope, KafkaTimestamp>,
233 connection: KafkaSourceConnection,
234 config: RawSourceCreationConfig,
235 resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
236 metadata_stream: StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
237 start_signal: impl std::future::Future<Output = ()> + 'static,
238) -> (
239 StackedCollection<'scope, KafkaTimestamp, (usize, Result<SourceMessage, DataflowError>)>,
240 StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
241 PressOnDropButton,
242) {
243 let name = format!("KafkaReader({})", config.id);
244 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
245
246 let (data_output, stream) = builder.new_output::<FueledBuilder<_>>();
247 let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
248
249 let mut metadata_input = builder.new_disconnected_input(metadata_stream.broadcast(), Pipeline);
250
251 let mut outputs = vec![];
252
253 let mut all_export_stats = vec![];
255 let mut snapshot_export_stats = vec![];
256 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
257 let SourceExport {
258 details,
259 storage_metadata: _,
260 data_config: _,
261 } = export;
262 let resume_upper = Antichain::from_iter(
263 config
264 .source_resume_uppers
265 .get(id)
266 .expect("all source exports must be present in source resume uppers")
267 .iter()
268 .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
269 );
270
271 let metadata_columns = match details {
272 SourceExportDetails::Kafka(details) => details
273 .metadata_columns
274 .iter()
275 .map(|(_name, kind)| kind.clone())
276 .collect::<Vec<_>>(),
277 _ => panic!("unexpected source export details: {:?}", details),
278 };
279
280 let statistics = config
281 .statistics
282 .get(id)
283 .expect("statistics have been initialized")
284 .clone();
285 if resume_upper.as_ref() == &[Partitioned::minimum()] {
287 snapshot_export_stats.push(statistics.clone());
288 }
289 all_export_stats.push(statistics);
290
291 let output = SourceOutputInfo {
292 id: *id,
293 resume_upper,
294 output_index: idx,
295 metadata_columns,
296 };
297 outputs.push(output);
298 }
299
300 let busy_signal = Arc::clone(&config.busy_signal);
301 let button = builder.build(move |caps| {
302 SignaledFuture::new(busy_signal, async move {
303 let [mut data_cap, health_cap] = caps.try_into().unwrap();
304
305 let client_id = connection.client_id(
306 config.config.config_set(),
307 &config.config.connection_context,
308 config.id,
309 );
310 let group_id = connection.group_id(&config.config.connection_context, config.id);
311 let KafkaSourceConnection {
312 connection,
313 topic,
314 topic_metadata_refresh_interval,
315 start_offsets,
316 metadata_columns: _,
317 connection_id: _, group_id_prefix: _, } = connection;
322
323 info!(
324 source_id = config.id.to_string(),
325 worker_id = config.worker_id,
326 num_workers = config.worker_count,
327 "instantiating Kafka source reader at offsets {start_offsets:?}"
328 );
329
330 let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
331 let notificator = Arc::new(Notify::new());
332
333 let consumer: Result<BaseConsumer<_>, _> = connection
334 .create_with_context(
335 &config.config,
336 GlueConsumerContext {
337 notificator: Arc::clone(¬ificator),
338 stats_tx,
339 inner: MzClientContext::default(),
340 },
341 &btreemap! {
342 "enable.auto.commit" => "false".into(),
347 "auto.offset.reset" => "earliest".into(),
350 "topic.metadata.refresh.interval.ms" =>
353 topic_metadata_refresh_interval
354 .as_millis()
355 .to_string(),
356 "fetch.message.max.bytes" => "134217728".into(),
358 "group.id" => group_id.clone(),
364 "client.id" => client_id.clone(),
367 },
368 InTask::Yes,
369 )
370 .await;
371
372 let consumer = match consumer {
373 Ok(consumer) => Arc::new(consumer),
374 Err(e) => {
375 let update = HealthStatusUpdate::halting(
376 format!(
377 "failed creating kafka reader consumer: {}",
378 e.display_with_causes()
379 ),
380 None,
381 );
382 health_output.give(
383 &health_cap,
384 HealthStatusMessage {
385 id: None,
386 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
387 StatusNamespace::Ssh
388 } else {
389 StatusNamespace::Kafka
390 },
391 update: update.clone(),
392 },
393 );
394 for (output, update) in outputs.iter().repeat_clone(update) {
395 health_output.give(
396 &health_cap,
397 HealthStatusMessage {
398 id: Some(output.id),
399 namespace: if matches!(e, ContextCreationError::Ssh(_)) {
400 StatusNamespace::Ssh
401 } else {
402 StatusNamespace::Kafka
403 },
404 update,
405 },
406 );
407 }
408 std::future::pending::<()>().await;
412 unreachable!("pending future never returns");
413 }
414 };
415
416 let mut start_offsets: BTreeMap<_, u64> = start_offsets
418 .clone()
419 .into_iter()
420 .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
421 .map(|(pid, offset)| (pid, u64::try_from(offset).expect("start offsets must be non-negative and fit into u64")))
422 .collect();
423
424 let mut partition_capabilities = BTreeMap::new();
425 let mut max_pid = None;
426 let resume_upper = Antichain::from_iter(
427 outputs
428 .iter()
429 .map(|output| output.resume_upper.clone())
430 .flatten(),
431 );
432
433 tracing::info!(
434 source_id = config.id.to_string(),
435 worker_id = config.worker_id,
436 num_workers = config.worker_count,
437 "Kafka source reader starting rehydration with resume upper: {resume_upper:?} and start offsets: {start_offsets:?}"
438 );
439
440 for ts in resume_upper.elements() {
441 if let Some(pid) = ts.interval().singleton() {
442 let pid = pid.unwrap_exact();
443 max_pid = std::cmp::max(max_pid, Some(*pid));
444
445 if responsible_for_pid(&config, *pid) {
446 let restored_offset = ts.timestamp().offset;
447 if let Some(start_offset) = start_offsets.get_mut(pid) {
448 *start_offset = std::cmp::max(restored_offset, *start_offset);
449 } else {
450 start_offsets.insert(*pid, restored_offset);
451 }
452
453 let part_ts = Partitioned::new_singleton(
454 RangeBound::exact(*pid),
455 ts.timestamp().clone(),
456 );
457 let part_cap = PartitionCapability {
458 data: data_cap.delayed(&part_ts),
459 };
460 partition_capabilities.insert(*pid, part_cap);
461 }
462 }
463 }
464 let lower = max_pid
465 .map(RangeBound::after)
466 .unwrap_or(RangeBound::NegInfinity);
467 let future_ts =
468 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
469 data_cap.downgrade(&future_ts);
470
471 if mz_storage_types::dyncfgs::KAFKA_LOW_WATERMARK_CHECK
472 .get(config.config.config_set())
473 {
474 let low_watermarks = fetch_partition_info(
475 &consumer,
476 topic.as_str(),
477 config
478 .config
479 .parameters
480 .kafka_timeout_config
481 .fetch_metadata_timeout,
482 Offset::Beginning, )
484 .unwrap_or_else(|e| {
485 tracing::warn!(
486 source_id = config.id.to_string(),
487 worker_id = config.worker_id,
488 num_workers = config.worker_count,
489 "Failed to fetch watermarks for topic {topic}: {e}"
490 );
491 let update = HealthStatusUpdate::stalled(
492 format!("Failed to fetch watermarks for topic {topic}: {e}"),
493 None,
494 );
495 health_output.give(
496 &health_cap,
497 HealthStatusMessage {
498 id: None,
499 namespace: StatusNamespace::Kafka,
500 update: update.clone(),
501 },
502 );
503 for (output, update) in outputs.iter().repeat_clone(update) {
504 health_output.give(
505 &health_cap,
506 HealthStatusMessage {
507 id: Some(output.id),
508 namespace: StatusNamespace::Kafka,
509 update,
510 },
511 );
512 }
513 let ssh_update = match consumer.client().context().tunnel_status() {
514 SshTunnelStatus::Running => HealthStatusUpdate::running(),
515 SshTunnelStatus::Errored(e) => HealthStatusUpdate::stalled(e, None),
516 };
517 health_output.give(
518 &health_cap,
519 HealthStatusMessage {
520 id: None,
521 namespace: StatusNamespace::Ssh,
522 update: ssh_update.clone(),
523 },
524 );
525 for (output, ssh_update) in outputs.iter().repeat_clone(ssh_update) {
526 health_output.give(
527 &health_cap,
528 HealthStatusMessage {
529 id: Some(output.id),
530 namespace: StatusNamespace::Ssh,
531 update: ssh_update,
532 },
533 );
534 }
535 if let GetPartitionsError::TopicDoesNotExist = e {
536 let error = Err(SourceError {
538 error: SourceErrorDetails::Initialization(e.to_string().into()),
539 }
540 .into());
541 let time = data_cap.time().clone();
542 for (output, error) in
543 outputs.iter().map(|o| o.output_index).repeat_clone(error)
544 {
545 let update = ((output, error), time.clone(), Diff::ONE);
546 data_output.give(&data_cap, update);
547 }
548 }
549 BTreeMap::new()
550 });
551 for (pid, lwm) in &low_watermarks {
552 if responsible_for_pid(&config, *pid) {
553 if let Some(start_offset) = start_offsets.get_mut(pid) {
560 tracing::info!(
561 source_id = config.id.to_string(),
562 worker_id = config.worker_id,
563 num_workers = config.worker_count,
564 "restored offset {start_offset} for topic {topic} partition {pid} with low watermark {lwm}"
565 );
566 if lwm > start_offset {
567 tracing::error!(
568 source_id = config.id.to_string(),
569 worker_id = config.worker_id,
570 num_workers = config.worker_count,
571 "start offset and resume upper {start_offset} for topic {topic} \
572 partition {pid} is behind the low watermark {lwm}. This likely \
573 means that the offsets have been compacted away by Kafka."
574 );
575 let err_str = format!(
576 "Low watermark {lwm} of kafka topic {topic} partition {pid} \
577 is past the start offset/resume upper: {start_offset} \
578 This likely means that the offsets have been compacted away \
579 by Kafka. Please consider setting a higher start offset or \
580 adjusting your retention policies to prevent this.",
581 );
582
583 let update = HealthStatusUpdate::stalled(
584 err_str.clone(),
585 None,
586 );
587 health_output.give(
588 &health_cap,
589 HealthStatusMessage {
590 id: None,
591 namespace: StatusNamespace::Kafka,
592 update: update.clone(),
593 },
594 );
595 let error = Err(
596 SourceError{
597 error:SourceErrorDetails::Initialization(err_str.into())
598 }.into()
599 );
600 let time = data_cap.time().clone();
601 for (output, error) in
602 outputs.iter().map(|o| o.output_index).repeat_clone(error)
603 {
604 let update = ((output, error), time.clone(), Diff::ONE);
605 let size = update.fuel_size();
606 data_output
607 .give_fueled(&data_cap, update, size)
608 .await;
609 }
610 return;
611 }
612 } else {
613 tracing::warn!(
614 source_id = config.id.to_string(),
615 worker_id = config.worker_id,
616 num_workers = config.worker_count,
617 "partition {pid} has a non-zero low watermark {lwm}, but no start offset or \
618 resume upper was found for this partition. Setting start offset to low watermark"
619 );
620 start_offsets.insert(*pid, *lwm);
621 }
622 }
623 }
624 }
625
626 start_signal.await;
630 info!(
631 source_id = config.id.to_string(),
632 worker_id = config.worker_id,
633 num_workers = config.worker_count,
634 "kafka worker noticed rehydration is finished, starting partition queues..."
635 );
636
637 let partition_ids = start_offsets.keys().copied().collect();
638 let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
639 let start_offsets = start_offsets.iter().map(|(pid, offset)| (*pid, i64::try_from(*offset).expect("start offsets must fit into i64"))).collect();
640
641 let mut reader = KafkaSourceReader {
642 topic_name: topic.clone(),
643 source_name: config.name.clone(),
644 id: config.id,
645 partition_consumers: Vec::new(),
646 consumer: Arc::clone(&consumer),
647 worker_id: config.worker_id,
648 worker_count: config.worker_count,
649 last_offsets: outputs
650 .iter()
651 .map(|output| (output.output_index, BTreeMap::new()))
652 .collect(),
653 start_offsets,
654 stats_rx,
655 partition_metrics: config.metrics.get_kafka_source_metrics(
656 partition_ids,
657 topic.clone(),
658 config.id,
659 ),
660 partition_capabilities,
661 };
662
663 let offset_committer = KafkaResumeUpperProcessor {
664 config: config.clone(),
665 topic_name: topic.clone(),
666 consumer,
667 statistics: all_export_stats.clone(),
668 };
669
670 if !snapshot_export_stats.is_empty() {
672 if let Err(e) = offset_committer
673 .process_frontier(resume_upper.clone())
674 .await
675 {
676 offset_commit_metrics.offset_commit_failures.inc();
677 tracing::warn!(
678 %e,
679 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
680 worker_id = config.worker_id,
681 source_id = config.id,
682 upper = resume_upper.pretty()
683 );
684 }
685 for statistics in config.statistics.values() {
689 statistics.set_snapshot_records_known(0);
690 statistics.set_snapshot_records_staged(0);
691 }
692 }
693
694 let resume_uppers_process_loop = async move {
695 tokio::pin!(resume_uppers);
696 while let Some(frontier) = resume_uppers.next().await {
697 if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
698 offset_commit_metrics.offset_commit_failures.inc();
699 tracing::warn!(
700 %e,
701 "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
702 worker_id = config.worker_id,
703 source_id = config.id,
704 upper = frontier.pretty()
705 );
706 }
707 }
708 std::future::pending::<()>().await;
713 };
714 tokio::pin!(resume_uppers_process_loop);
715
716 let mut metadata_update: Option<MetadataUpdate> = None;
717 let mut snapshot_total = None;
718
719 let max_wait_time =
720 mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
721 loop {
722 tokio::select! {
725 _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
728
729 _ = metadata_input.ready() => {
730 let mut updates = Vec::new();
732 while let Some(event) = metadata_input.next_sync() {
733 if let Event::Data(_, mut data) = event {
734 updates.append(&mut data);
735 }
736 }
737 metadata_update = updates
738 .into_iter()
739 .max_by_key(|(ts, _)| *ts)
740 .map(|(_, update)| update);
741 }
742
743 _ = resume_uppers_process_loop.as_mut() => {},
747 }
748
749 match metadata_update.take() {
750 Some(MetadataUpdate::Partitions(partitions)) => {
751 let max_pid = partitions.keys().last().cloned();
752 let lower = max_pid
753 .map(RangeBound::after)
754 .unwrap_or(RangeBound::NegInfinity);
755 let future_ts = Partitioned::new_range(
756 lower,
757 RangeBound::PosInfinity,
758 MzOffset::from(0),
759 );
760
761 let mut offset_known = 0;
762 for (&pid, &high_watermark) in &partitions {
763 if responsible_for_pid(&config, pid) {
764 offset_known += high_watermark;
765 reader.ensure_partition(pid);
766 if let Entry::Vacant(entry) =
767 reader.partition_capabilities.entry(pid)
768 {
769 let start_offset = match reader.start_offsets.get(&pid) {
770 Some(&offset) => offset.try_into().unwrap(),
771 None => 0u64,
772 };
773 let part_since_ts = Partitioned::new_singleton(
774 RangeBound::exact(pid),
775 MzOffset::from(start_offset),
776 );
777
778 entry.insert(PartitionCapability {
779 data: data_cap.delayed(&part_since_ts),
780 });
781 }
782 }
783 }
784
785 if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
788 snapshot_total = Some(offset_known);
792 }
793
794 for output in &outputs {
798 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
799 health_output.give(
800 &health_cap,
801 HealthStatusMessage {
802 id: Some(output.id),
803 namespace,
804 update: HealthStatusUpdate::running(),
805 },
806 );
807 }
808 }
809 for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
810 health_output.give(
811 &health_cap,
812 HealthStatusMessage {
813 id: None,
814 namespace,
815 update: HealthStatusUpdate::running(),
816 },
817 );
818 }
819
820 for export_stat in all_export_stats.iter() {
821 export_stat.set_offset_known(offset_known);
822 }
823
824 data_cap.downgrade(&future_ts);
825 }
826 Some(MetadataUpdate::TransientError(status)) => {
827 if let Some(update) = status.kafka {
828 health_output.give(
829 &health_cap,
830 HealthStatusMessage {
831 id: None,
832 namespace: StatusNamespace::Kafka,
833 update: update.clone(),
834 },
835 );
836 for (output, update) in outputs.iter().repeat_clone(update) {
837 health_output.give(
838 &health_cap,
839 HealthStatusMessage {
840 id: Some(output.id),
841 namespace: StatusNamespace::Kafka,
842 update,
843 },
844 );
845 }
846 }
847 if let Some(update) = status.ssh {
848 health_output.give(
849 &health_cap,
850 HealthStatusMessage {
851 id: None,
852 namespace: StatusNamespace::Ssh,
853 update: update.clone(),
854 },
855 );
856 for (output, update) in outputs.iter().repeat_clone(update) {
857 health_output.give(
858 &health_cap,
859 HealthStatusMessage {
860 id: Some(output.id),
861 namespace: StatusNamespace::Ssh,
862 update,
863 },
864 );
865 }
866 }
867 }
868 Some(MetadataUpdate::DefiniteError(error)) => {
869 health_output.give(
870 &health_cap,
871 HealthStatusMessage {
872 id: None,
873 namespace: StatusNamespace::Kafka,
874 update: HealthStatusUpdate::stalled(
875 error.to_string(),
876 None,
877 ),
878 },
879 );
880 let error = Err(error.into());
881 let time = data_cap.time().clone();
882 for (output, error) in
883 outputs.iter().map(|o| o.output_index).repeat_clone(error)
884 {
885 let update = ((output, error), time, Diff::ONE);
886 let size = update.fuel_size();
887 data_output
888 .give_fueled(&data_cap, update, size)
889 .await;
890 }
891
892 return;
893 }
894 None => {}
895 }
896
897 while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
905 match result {
906 Err(e) => {
907 let error = format!(
908 "kafka error when polling consumer for source: {} topic: {} : {}",
909 reader.source_name, reader.topic_name, e
910 );
911 let status = HealthStatusUpdate::stalled(error, None);
912 health_output.give(
913 &health_cap,
914 HealthStatusMessage {
915 id: None,
916 namespace: StatusNamespace::Kafka,
917 update: status.clone(),
918 },
919 );
920 for (output, status) in outputs.iter().repeat_clone(status) {
921 health_output.give(
922 &health_cap,
923 HealthStatusMessage {
924 id: Some(output.id),
925 namespace: StatusNamespace::Kafka,
926 update: status,
927 },
928 );
929 }
930 }
931 Ok(message) => {
932 let output_messages = outputs
933 .iter()
934 .map(|output| {
935 let (message, ts) = construct_source_message(
936 &message,
937 &output.metadata_columns,
938 );
939 (output.output_index, message, ts)
940 })
941 .collect::<Vec<_>>();
945 for (output_index, message, ts) in output_messages {
946 if let Some((msg, time, diff)) =
947 reader.handle_message(message, ts, &output_index)
948 {
949 let pid = time.interval().singleton().unwrap().unwrap_exact();
950 let part_cap = &reader.partition_capabilities[pid].data;
951 let msg = msg.map_err(|e| {
952 DataflowError::SourceError(Box::new(SourceError {
953 error: SourceErrorDetails::Other(e.to_string().into()),
954 }))
955 });
956 let update = ((output_index, msg), time, diff);
957 let size = update.fuel_size();
958 data_output
959 .give_fueled(part_cap, update, size)
960 .await;
961 }
962 }
963 }
964 }
965 }
966
967 reader.update_stats();
968
969 let mut consumers = std::mem::take(&mut reader.partition_consumers);
971 for consumer in consumers.iter_mut() {
972 let pid = consumer.pid();
973 let mut partition_exhausted = false;
979 for _ in 0..10_000 {
980 let Some(message) = consumer.get_next_message().transpose() else {
981 partition_exhausted = true;
982 break;
983 };
984
985 for output in outputs.iter() {
986 let message = match &message {
987 Ok((msg, pid)) => {
988 let (msg, ts) =
989 construct_source_message(msg, &output.metadata_columns);
990 assert_eq!(*pid, ts.0);
991 Ok(reader.handle_message(msg, ts, &output.output_index))
992 }
993 Err(err) => Err(err),
994 };
995 match message {
996 Ok(Some((msg, time, diff))) => {
997 let pid = time.interval().singleton().unwrap().unwrap_exact();
998 let part_cap = &reader.partition_capabilities[pid].data;
999 let msg = msg.map_err(|e| {
1000 DataflowError::SourceError(Box::new(SourceError {
1001 error: SourceErrorDetails::Other(e.to_string().into()),
1002 }))
1003 });
1004 let update =
1005 ((output.output_index, msg), time, diff);
1006 let size = update.fuel_size();
1007 data_output
1008 .give_fueled(part_cap, update, size)
1009 .await;
1010 }
1011 Ok(None) => continue,
1013 Err(err) => {
1014 let last_offset = reader
1015 .last_offsets
1016 .get(&output.output_index)
1017 .expect("output known to be installed")
1018 .get(&pid)
1019 .expect("partition known to be installed");
1020
1021 let status = HealthStatusUpdate::stalled(
1022 format!(
1023 "error consuming from source: {} topic: {topic}:\
1024 partition: {pid} last processed offset:\
1025 {last_offset} : {err}",
1026 config.name
1027 ),
1028 None,
1029 );
1030 health_output.give(
1031 &health_cap,
1032 HealthStatusMessage {
1033 id: None,
1034 namespace: StatusNamespace::Kafka,
1035 update: status.clone(),
1036 },
1037 );
1038 health_output.give(
1039 &health_cap,
1040 HealthStatusMessage {
1041 id: Some(output.id),
1042 namespace: StatusNamespace::Kafka,
1043 update: status,
1044 },
1045 );
1046 }
1047 }
1048 }
1049 }
1050 if !partition_exhausted {
1051 notificator.notify_one();
1052 }
1053 }
1054 assert!(reader.partition_consumers.is_empty());
1056 reader.partition_consumers = consumers;
1057
1058 let positions = reader.consumer.position().unwrap();
1059 let topic_positions = positions.elements_for_topic(&reader.topic_name);
1060 let mut snapshot_staged = 0;
1061
1062 for position in topic_positions {
1063 if let Offset::Offset(offset) = position.offset() {
1066 let pid = position.partition();
1067 let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
1068 let upper =
1069 Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
1070
1071 let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
1072 match part_cap.data.try_downgrade(&upper) {
1073 Ok(()) => {
1074 if !snapshot_export_stats.is_empty() {
1075 snapshot_staged += offset.try_into().unwrap_or(0u64);
1078 if let Some(snapshot_total) = snapshot_total {
1080 snapshot_staged =
1083 std::cmp::min(snapshot_staged, snapshot_total);
1084 }
1085 }
1086 }
1087 Err(_) => {
1088 info!(
1091 source_id = config.id.to_string(),
1092 worker_id = config.worker_id,
1093 num_workers = config.worker_count,
1094 "kafka source frontier downgrade skipped due to already \
1095 seen offset: {:?}",
1096 upper
1097 );
1098 }
1099 };
1100
1101 }
1102 }
1103
1104 if let (Some(snapshot_total), true) =
1105 (snapshot_total, !snapshot_export_stats.is_empty())
1106 {
1107 for export_stat in snapshot_export_stats.iter() {
1108 export_stat.set_snapshot_records_known(snapshot_total);
1109 export_stat.set_snapshot_records_staged(snapshot_staged);
1110 }
1111 if snapshot_total == snapshot_staged {
1112 snapshot_export_stats.clear();
1113 }
1114 }
1115 }
1116 })
1117 });
1118
1119 (
1120 stream.as_collection(),
1121 health_stream,
1122 button.press_on_drop(),
1123 )
1124}
1125
1126impl KafkaResumeUpperProcessor {
1127 async fn process_frontier(
1128 &self,
1129 frontier: Antichain<KafkaTimestamp>,
1130 ) -> Result<(), anyhow::Error> {
1131 use rdkafka::consumer::CommitMode;
1132
1133 let mut offsets = vec![];
1135 let mut offset_committed = 0;
1136 for ts in frontier.iter() {
1137 if let Some(pid) = ts.interval().singleton() {
1138 let pid = pid.unwrap_exact();
1139 if responsible_for_pid(&self.config, *pid) {
1140 offsets.push((pid.clone(), *ts.timestamp()));
1141
1142 offset_committed += ts.timestamp().offset;
1147 }
1148 }
1149 }
1150
1151 for export_stat in self.statistics.iter() {
1152 export_stat.set_offset_committed(offset_committed);
1153 }
1154
1155 if !offsets.is_empty() {
1156 let mut tpl = TopicPartitionList::new();
1157 for (pid, offset) in offsets {
1158 let offset_to_commit =
1159 Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
1160 tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
1161 .expect("offset known to be valid");
1162 }
1163 let consumer = Arc::clone(&self.consumer);
1164 mz_ore::task::spawn_blocking(
1165 || format!("source({}) kafka offset commit", self.config.id),
1166 move || consumer.commit(&tpl, CommitMode::Sync),
1167 )
1168 .await?;
1169 }
1170 Ok(())
1171 }
1172}
1173
1174impl KafkaSourceReader {
1175 fn ensure_partition(&mut self, pid: PartitionId) {
1177 if self.last_offsets.is_empty() {
1178 tracing::info!(
1179 source_id = %self.id,
1180 worker_id = %self.worker_id,
1181 "kafka source does not have any outputs, not creating partition queue");
1182
1183 return;
1184 }
1185 for last_offsets in self.last_offsets.values() {
1186 if last_offsets.contains_key(&pid) {
1188 return;
1189 }
1190 }
1191
1192 let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1193 self.create_partition_queue(pid, Offset::Offset(start_offset));
1194
1195 for last_offsets in self.last_offsets.values_mut() {
1196 let prev = last_offsets.insert(pid, start_offset - 1);
1197 assert_none!(prev);
1198 }
1199 }
1200
1201 fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1203 info!(
1204 source_id = self.id.to_string(),
1205 worker_id = self.worker_id,
1206 num_workers = self.worker_count,
1207 "activating Kafka queue for topic {}, partition {}",
1208 self.topic_name,
1209 partition_id,
1210 );
1211
1212 let tpl = self.consumer.assignment().unwrap();
1214 let mut partition_list = TopicPartitionList::new();
1216 for partition in tpl.elements_for_topic(&self.topic_name) {
1217 partition_list
1218 .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1219 .expect("offset known to be valid");
1220 }
1221 partition_list
1223 .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1224 .expect("offset known to be valid");
1225 self.consumer
1226 .assign(&partition_list)
1227 .expect("assignment known to be valid");
1228
1229 let context = Arc::clone(self.consumer.context());
1232 for pc in &mut self.partition_consumers {
1233 pc.partition_queue = self
1234 .consumer
1235 .split_partition_queue(&self.topic_name, pc.pid)
1236 .expect("partition known to be valid");
1237 pc.partition_queue.set_nonempty_callback({
1238 let context = Arc::clone(&context);
1239 move || context.inner().activate()
1240 });
1241 }
1242
1243 let mut partition_queue = self
1244 .consumer
1245 .split_partition_queue(&self.topic_name, partition_id)
1246 .expect("partition known to be valid");
1247 partition_queue.set_nonempty_callback(move || context.inner().activate());
1248 self.partition_consumers
1249 .push(PartitionConsumer::new(partition_id, partition_queue));
1250 assert_eq!(
1251 self.consumer
1252 .assignment()
1253 .unwrap()
1254 .elements_for_topic(&self.topic_name)
1255 .len(),
1256 self.partition_consumers.len()
1257 );
1258 }
1259
1260 fn update_stats(&mut self) {
1262 while let Ok(stats) = self.stats_rx.try_recv() {
1263 match serde_json::from_str::<Statistics>(&stats.to_string()) {
1264 Ok(statistics) => {
1265 let topic = statistics.topics.get(&self.topic_name);
1266 match topic {
1267 Some(topic) => {
1268 for (id, partition) in &topic.partitions {
1269 self.partition_metrics
1270 .set_offset_max(*id, partition.hi_offset);
1271 }
1272 }
1273 None => error!("No stats found for topic: {}", &self.topic_name),
1274 }
1275 }
1276 Err(e) => {
1277 error!("failed decoding librdkafka statistics JSON: {}", e);
1278 }
1279 }
1280 }
1281 }
1282
1283 fn handle_message(
1286 &mut self,
1287 message: Result<SourceMessage, KafkaHeaderParseError>,
1288 (partition, offset): (PartitionId, MzOffset),
1289 output_index: &usize,
1290 ) -> Option<(
1291 Result<SourceMessage, KafkaHeaderParseError>,
1292 KafkaTimestamp,
1293 Diff,
1294 )> {
1295 assert!(
1307 self.last_offsets
1308 .get(output_index)
1309 .unwrap()
1310 .contains_key(&partition)
1311 );
1312
1313 let last_offset_ref = self
1314 .last_offsets
1315 .get_mut(output_index)
1316 .expect("output known to be installed")
1317 .get_mut(&partition)
1318 .expect("partition known to be installed");
1319
1320 let last_offset = *last_offset_ref;
1321 let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1322 if offset_as_i64 <= last_offset {
1323 info!(
1324 source_id = self.id.to_string(),
1325 worker_id = self.worker_id,
1326 num_workers = self.worker_count,
1327 "kafka message before expected offset: \
1328 source {} (reading topic {}, partition {}, output {}) \
1329 received offset {} expected offset {:?}",
1330 self.source_name,
1331 self.topic_name,
1332 partition,
1333 output_index,
1334 offset.offset,
1335 last_offset + 1,
1336 );
1337 None
1339 } else {
1340 *last_offset_ref = offset_as_i64;
1341
1342 let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1343 Some((message, ts, Diff::ONE))
1344 }
1345 }
1346}
1347
1348fn construct_source_message(
1349 msg: &BorrowedMessage<'_>,
1350 metadata_columns: &[KafkaMetadataKind],
1351) -> (
1352 Result<SourceMessage, KafkaHeaderParseError>,
1353 (PartitionId, MzOffset),
1354) {
1355 let pid = msg.partition();
1356 let Ok(offset) = u64::try_from(msg.offset()) else {
1357 panic!(
1358 "got negative offset ({}) from otherwise non-error'd kafka message",
1359 msg.offset()
1360 );
1361 };
1362
1363 let mut metadata = Row::default();
1364 let mut packer = metadata.packer();
1365 for kind in metadata_columns {
1366 match kind {
1367 KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1368 KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1369 KafkaMetadataKind::Timestamp => {
1370 let ts = msg
1371 .timestamp()
1372 .to_millis()
1373 .expect("kafka sources always have upstream_time");
1374
1375 let d: Datum = DateTime::from_timestamp_millis(ts)
1376 .and_then(|dt| {
1377 let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1378 dt.naive_utc().try_into().ok();
1379 ct
1380 })
1381 .into();
1382 packer.push(d)
1383 }
1384 KafkaMetadataKind::Header { key, use_bytes } => {
1385 match msg.headers() {
1386 Some(headers) => {
1387 let d = headers
1388 .iter()
1389 .filter(|header| header.key == key)
1390 .last()
1391 .map(|header| match header.value {
1392 Some(v) => {
1393 if *use_bytes {
1394 Ok(Datum::Bytes(v))
1395 } else {
1396 match str::from_utf8(v) {
1397 Ok(str) => Ok(Datum::String(str)),
1398 Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1399 key: key.clone(),
1400 raw: v.to_vec(),
1401 }),
1402 }
1403 }
1404 }
1405 None => Ok(Datum::Null),
1406 })
1407 .unwrap_or_else(|| {
1408 Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1409 });
1410 match d {
1411 Ok(d) => packer.push(d),
1412 Err(err) => return (Err(err), (pid, offset.into())),
1414 }
1415 }
1416 None => packer.push(Datum::Null),
1417 }
1418 }
1419 KafkaMetadataKind::Headers => {
1420 packer.push_list_with(|r| {
1421 if let Some(headers) = msg.headers() {
1422 for header in headers.iter() {
1423 match header.value {
1424 Some(v) => r.push_list_with(|record_row| {
1425 record_row.push(Datum::String(header.key));
1426 record_row.push(Datum::Bytes(v));
1427 }),
1428 None => r.push_list_with(|record_row| {
1429 record_row.push(Datum::String(header.key));
1430 record_row.push(Datum::Null);
1431 }),
1432 }
1433 }
1434 }
1435 });
1436 }
1437 }
1438 }
1439
1440 let key = match msg.key() {
1441 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1442 None => Row::pack([Datum::Null]),
1443 };
1444 let value = match msg.payload() {
1445 Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1446 None => Row::pack([Datum::Null]),
1447 };
1448 (
1449 Ok(SourceMessage {
1450 key,
1451 value,
1452 metadata,
1453 }),
1454 (pid, offset.into()),
1455 )
1456}
1457
1458struct PartitionConsumer {
1460 pid: PartitionId,
1462 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1464}
1465
1466impl PartitionConsumer {
1467 fn new(
1469 pid: PartitionId,
1470 partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1471 ) -> Self {
1472 PartitionConsumer {
1473 pid,
1474 partition_queue,
1475 }
1476 }
1477
1478 fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1485 match self.partition_queue.poll(Duration::from_millis(0)) {
1486 Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1487 Some(Err(err)) => Err(err),
1488 _ => Ok(None),
1489 }
1490 }
1491
1492 fn pid(&self) -> PartitionId {
1494 self.pid
1495 }
1496}
1497
1498struct GlueConsumerContext {
1501 notificator: Arc<Notify>,
1502 stats_tx: crossbeam_channel::Sender<Jsonb>,
1503 inner: MzClientContext,
1504}
1505
1506impl ClientContext for GlueConsumerContext {
1507 fn stats_raw(&self, statistics: &[u8]) {
1508 match Jsonb::from_slice(statistics) {
1509 Ok(statistics) => {
1510 self.stats_tx
1511 .send(statistics)
1512 .expect("timely operator hung up while Kafka source active");
1513 self.activate();
1514 }
1515 Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1516 };
1517 }
1518
1519 fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1522 self.inner.log(level, fac, log_message)
1523 }
1524 fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1525 self.inner.error(error, reason)
1526 }
1527}
1528
1529impl GlueConsumerContext {
1530 fn activate(&self) {
1531 self.notificator.notify_one();
1532 }
1533}
1534
1535impl ConsumerContext for GlueConsumerContext {}
1536
1537#[cfg(test)]
1538mod tests {
1539 use std::sync::Arc;
1540 use std::time::Duration;
1541
1542 use mz_kafka_util::client::create_new_client_config_simple;
1543 use rdkafka::consumer::{BaseConsumer, Consumer};
1544 use rdkafka::{Message, Offset, TopicPartitionList};
1545 use uuid::Uuid;
1546
1547 #[mz_ore::test]
1559 #[ignore]
1560 fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1561 let topic_name = "queue-test";
1562 let pid = 0;
1563
1564 let mut kafka_config = create_new_client_config_simple();
1565 kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1566 kafka_config.set("enable.auto.commit", "false");
1567 kafka_config.set("group.id", Uuid::new_v4().to_string());
1568 kafka_config.set("fetch.message.max.bytes", "100");
1569 let consumer: BaseConsumer<_> = kafka_config.create()?;
1570
1571 let consumer = Arc::new(consumer);
1572
1573 let mut partition_list = TopicPartitionList::new();
1574 partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1577
1578 consumer.assign(&partition_list)?;
1579
1580 let partition_queue = consumer
1581 .split_partition_queue(topic_name, pid)
1582 .expect("missing partition queue");
1583
1584 let expected_messages = 1_000;
1585
1586 let mut common_queue_count = 0;
1587 let mut partition_queue_count = 0;
1588
1589 loop {
1590 if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1591 match msg {
1592 Ok(msg) => {
1593 let _payload =
1594 std::str::from_utf8(msg.payload().expect("missing payload"))?;
1595 if partition_queue_count > 0 {
1596 anyhow::bail!(
1597 "Got message from common queue after we internally switched to partition queue."
1598 );
1599 }
1600
1601 common_queue_count += 1;
1602 }
1603 Err(err) => anyhow::bail!("{}", err),
1604 }
1605 }
1606
1607 match partition_queue.poll(Duration::from_millis(0)) {
1608 Some(Ok(msg)) => {
1609 let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1610 partition_queue_count += 1;
1611 }
1612 Some(Err(err)) => anyhow::bail!("{}", err),
1613 _ => (),
1614 }
1615
1616 if (common_queue_count + partition_queue_count) == expected_messages {
1617 break;
1618 }
1619 }
1620
1621 assert!(
1622 common_queue_count == 0,
1623 "Got {} out of {} messages from common queue. Partition queue: {}",
1624 common_queue_count,
1625 expected_messages,
1626 partition_queue_count
1627 );
1628
1629 Ok(())
1630 }
1631}
1632
1633fn fetch_partition_info<C: ConsumerContext>(
1635 consumer: &BaseConsumer<C>,
1636 topic: &str,
1637 fetch_timeout: Duration,
1638 offset_requested: Offset,
1639) -> Result<BTreeMap<PartitionId, PartitionWatermark>, GetPartitionsError> {
1640 let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1641
1642 let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1643 for pid in pids {
1644 offset_requests.add_partition_offset(topic, pid, offset_requested)?;
1645 }
1646
1647 let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1648
1649 let mut result = BTreeMap::new();
1650 for entry in offset_responses.elements() {
1651 let offset = match entry.offset() {
1652 Offset::Offset(offset) => offset,
1653 offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1654 };
1655
1656 let pid = entry.partition();
1657 let watermark = offset.try_into().expect("invalid negative offset");
1658 result.insert(pid, watermark);
1659 }
1660
1661 Ok(result)
1662}
1663
1664#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1666enum MetadataUpdate {
1667 Partitions(BTreeMap<PartitionId, PartitionWatermark>),
1669 TransientError(HealthStatus),
1673 DefiniteError(SourceError),
1677}
1678
1679impl MetadataUpdate {
1680 fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1682 match self {
1683 Self::Partitions(partitions) => {
1684 let max_pid = partitions.keys().last().copied();
1685 let lower = max_pid
1686 .map(RangeBound::after)
1687 .unwrap_or(RangeBound::NegInfinity);
1688 let future_ts =
1689 Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1690
1691 let mut frontier = Antichain::from_elem(future_ts);
1692 for (pid, high_watermark) in partitions {
1693 frontier.insert(Partitioned::new_singleton(
1694 RangeBound::exact(*pid),
1695 MzOffset::from(*high_watermark),
1696 ));
1697 }
1698
1699 Some(frontier)
1700 }
1701 Self::DefiniteError(_) => Some(Antichain::new()),
1702 Self::TransientError(_) => None,
1703 }
1704 }
1705}
1706
1707#[derive(Debug, thiserror::Error)]
1708pub enum KafkaHeaderParseError {
1709 #[error("A header with key '{key}' was not found in the message headers")]
1710 KeyNotFound { key: String },
1711 #[error(
1712 "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1713 )]
1714 Utf8Error { key: String, raw: Vec<u8> },
1715}
1716
1717fn render_metadata_fetcher<'scope>(
1723 scope: Scope<'scope, KafkaTimestamp>,
1724 connection: KafkaSourceConnection,
1725 config: RawSourceCreationConfig,
1726) -> (
1727 StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
1728 StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
1729 PressOnDropButton,
1730) {
1731 let active_worker_id = usize::cast_from(config.id.hashed());
1732 let is_active_worker = active_worker_id % scope.peers() == scope.index();
1733
1734 let resume_upper = Antichain::from_iter(
1735 config
1736 .source_resume_uppers
1737 .values()
1738 .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1739 .flatten(),
1740 );
1741
1742 let name = format!("KafkaMetadataFetcher({})", config.id);
1743 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1744
1745 let (metadata_output, metadata_stream) =
1746 builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1747 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1748
1749 let button = builder.build(move |caps| async move {
1750 if !is_active_worker {
1751 return;
1752 }
1753
1754 let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1755
1756 let client_id = connection.client_id(
1757 config.config.config_set(),
1758 &config.config.connection_context,
1759 config.id,
1760 );
1761 let KafkaSourceConnection {
1762 connection,
1763 topic,
1764 topic_metadata_refresh_interval,
1765 ..
1766 } = connection;
1767
1768 let consumer: Result<BaseConsumer<_>, _> = connection
1769 .create_with_context(
1770 &config.config,
1771 MzClientContext::default(),
1772 &btreemap! {
1773 "topic.metadata.refresh.interval.ms" =>
1776 topic_metadata_refresh_interval
1777 .as_millis()
1778 .to_string(),
1779 "client.id" => format!("{client_id}-metadata"),
1782 },
1783 InTask::Yes,
1784 )
1785 .await;
1786
1787 let consumer = match consumer {
1788 Ok(consumer) => consumer,
1789 Err(e) => {
1790 let msg = format!(
1791 "failed creating kafka metadata consumer: {}",
1792 e.display_with_causes()
1793 );
1794 let status_update = HealthStatusUpdate::halting(msg, None);
1795 let status = match e {
1796 ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1797 _ => HealthStatus::kafka(status_update),
1798 };
1799 let error = MetadataUpdate::TransientError(status);
1800 let timestamp = (config.now_fn)().into();
1801 metadata_output.give(&metadata_cap, (timestamp, error));
1802
1803 std::future::pending::<()>().await;
1807 unreachable!("pending future never returns");
1808 }
1809 };
1810
1811 let (tx, mut rx) = mpsc::unbounded_channel();
1812 spawn_metadata_thread(config, consumer, topic, tx);
1813
1814 let mut prev_upstream_frontier = resume_upper;
1815
1816 while let Some((timestamp, mut update)) = rx.recv().await {
1817 if prev_upstream_frontier.is_empty() {
1818 return;
1819 }
1820
1821 if let Some(upstream_frontier) = update.upstream_frontier() {
1822 if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1832 let error = SourceError {
1833 error: SourceErrorDetails::Other("topic was recreated".into()),
1834 };
1835 update = MetadataUpdate::DefiniteError(error);
1836 }
1837 }
1838
1839 if let Some(upstream_frontier) = update.upstream_frontier() {
1840 prev_upstream_frontier = upstream_frontier.clone();
1841
1842 let probe = Probe {
1843 probe_ts: timestamp,
1844 upstream_frontier,
1845 };
1846 probe_output.give(&probe_cap, probe);
1847 }
1848
1849 metadata_output.give(&metadata_cap, (timestamp, update));
1850 }
1851 });
1852
1853 (metadata_stream, probe_stream, button.press_on_drop())
1854}
1855
1856fn spawn_metadata_thread<C: ConsumerContext>(
1857 config: RawSourceCreationConfig,
1858 consumer: BaseConsumer<TunnelingClientContext<C>>,
1859 topic: String,
1860 tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1861) {
1862 thread::Builder::new()
1864 .name(format!("kfk-mtdt-{}", config.id))
1865 .spawn(move || {
1866 trace!(
1867 source_id = config.id.to_string(),
1868 worker_id = config.worker_id,
1869 num_workers = config.worker_count,
1870 "kafka metadata thread: starting..."
1871 );
1872
1873 let timestamp_interval = config.timestamp_interval;
1874 let mut ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
1875
1876 loop {
1877 let probe_ts = ticker.tick_blocking();
1878 let result = fetch_partition_info(
1879 &consumer,
1880 &topic,
1881 config
1882 .config
1883 .parameters
1884 .kafka_timeout_config
1885 .fetch_metadata_timeout,
1886 Offset::End,
1887 );
1888 trace!(
1889 source_id = config.id.to_string(),
1890 worker_id = config.worker_id,
1891 num_workers = config.worker_count,
1892 "kafka metadata thread: metadata fetch result: {:?}",
1893 result
1894 );
1895 let update = match result {
1896 Ok(partitions) => {
1897 trace!(
1898 source_id = config.id.to_string(),
1899 worker_id = config.worker_id,
1900 num_workers = config.worker_count,
1901 "kafka metadata thread: fetched partition metadata info",
1902 );
1903
1904 MetadataUpdate::Partitions(partitions)
1905 }
1906 Err(GetPartitionsError::TopicDoesNotExist) => {
1907 let error = SourceError {
1908 error: SourceErrorDetails::Other("topic was deleted".into()),
1909 };
1910 MetadataUpdate::DefiniteError(error)
1911 }
1912 Err(e) => {
1913 let kafka_status = Some(HealthStatusUpdate::stalled(
1914 format!("{}", e.display_with_causes()),
1915 None,
1916 ));
1917
1918 let ssh_status = consumer.client().context().tunnel_status();
1919 let ssh_status = match ssh_status {
1920 SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1921 SshTunnelStatus::Errored(e) => {
1922 Some(HealthStatusUpdate::stalled(e, None))
1923 }
1924 };
1925
1926 MetadataUpdate::TransientError(HealthStatus {
1927 kafka: kafka_status,
1928 ssh: ssh_status,
1929 })
1930 }
1931 };
1932
1933 if tx.send((probe_ts, update)).is_err() {
1934 break;
1935 }
1936 }
1937
1938 info!(
1939 source_id = config.id.to_string(),
1940 worker_id = config.worker_id,
1941 num_workers = config.worker_count,
1942 "kafka metadata thread: receiver has gone away; shutting down."
1943 )
1944 })
1945 .unwrap();
1946}