Skip to main content

mz_storage/source/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::str::{self};
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use chrono::{DateTime, NaiveDateTime};
19use differential_dataflow::{AsCollection, Hashable};
20use futures::StreamExt;
21use itertools::Itertools;
22use maplit::btreemap;
23use mz_kafka_util::client::{
24    GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
25};
26use mz_ore::assert_none;
27use mz_ore::cast::CastFrom;
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::iter::IteratorExt;
31use mz_repr::adt::timestamp::CheckedTimestamp;
32use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
33use mz_ssh_util::tunnel::SshTunnelStatus;
34use mz_storage_types::errors::{
35    ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
36};
37use mz_storage_types::sources::kafka::{
38    KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
39};
40use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
41use mz_timely_util::antichain::AntichainExt;
42use mz_timely_util::builder_async::{
43    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::containers::stack::AccountedStackBuilder;
46use mz_timely_util::order::Partitioned;
47use rdkafka::consumer::base_consumer::PartitionQueue;
48use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
49use rdkafka::error::KafkaError;
50use rdkafka::message::{BorrowedMessage, Headers};
51use rdkafka::statistics::Statistics;
52use rdkafka::topic_partition_list::Offset;
53use rdkafka::{ClientContext, Message, TopicPartitionList};
54use serde::{Deserialize, Serialize};
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Pipeline;
58use timely::dataflow::operators::Capability;
59use timely::dataflow::operators::core::Partition;
60use timely::dataflow::operators::vec::Broadcast;
61use timely::dataflow::{Scope, StreamVec};
62use timely::progress::Antichain;
63use timely::progress::Timestamp;
64use tokio::sync::{Notify, mpsc};
65use tracing::{error, info, trace};
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
68use crate::metrics::source::kafka::KafkaSourceMetrics;
69use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
70use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
71use crate::statistics::SourceStatistics;
72
73#[derive(
74    Clone,
75    Debug,
76    Default,
77    PartialEq,
78    Eq,
79    PartialOrd,
80    Ord,
81    Serialize,
82    Deserialize
83)]
84struct HealthStatus {
85    kafka: Option<HealthStatusUpdate>,
86    ssh: Option<HealthStatusUpdate>,
87}
88
89impl HealthStatus {
90    fn kafka(update: HealthStatusUpdate) -> Self {
91        Self {
92            kafka: Some(update),
93            ssh: None,
94        }
95    }
96
97    fn ssh(update: HealthStatusUpdate) -> Self {
98        Self {
99            kafka: None,
100            ssh: Some(update),
101        }
102    }
103}
104
105/// Contains all information necessary to ingest data from Kafka
106pub struct KafkaSourceReader {
107    /// Name of the topic on which this source is backed on
108    topic_name: String,
109    /// Name of the source (will have format kafka-source-id)
110    source_name: String,
111    /// Source global ID
112    id: GlobalId,
113    /// Kafka consumer for this source
114    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
115    /// List of consumers. A consumer should be assigned per partition to guarantee fairness
116    partition_consumers: Vec<PartitionConsumer>,
117    /// Worker ID
118    worker_id: usize,
119    /// Total count of workers
120    worker_count: usize,
121    /// The most recently read offset for each partition known to this source
122    /// reader by output-index. An offset of -1 indicates that no prior message
123    /// has been read for the given partition.
124    last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
125    /// The offset to start reading from for each partition.
126    start_offsets: BTreeMap<PartitionId, i64>,
127    /// Channel to receive Kafka statistics JSON blobs from the stats callback.
128    stats_rx: crossbeam_channel::Receiver<Jsonb>,
129    /// A handle to the partition specific metrics
130    partition_metrics: KafkaSourceMetrics,
131    /// Per partition capabilities used to produce messages
132    partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
133}
134
135struct PartitionCapability {
136    /// The capability of the data produced
137    data: Capability<KafkaTimestamp>,
138}
139
140/// The high watermark offsets of a Kafka partition.
141///
142/// This is the offset of the latest message in the topic/partition available for consumption + 1.
143type HighWatermark = u64;
144
145/// Processes `resume_uppers` stream updates, committing them upstream and
146/// storing them in the `progress_statistics` to be emitted later.
147pub struct KafkaResumeUpperProcessor {
148    config: RawSourceCreationConfig,
149    topic_name: String,
150    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
151    statistics: Vec<SourceStatistics>,
152}
153
154/// Computes whether this worker is responsible for consuming a partition. It assigns partitions to
155/// workers in a round-robin fashion, starting at an arbitrary worker based on the hash of the
156/// source id.
157fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
158    let pid = usize::try_from(pid).expect("positive pid");
159    ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
160}
161
162struct SourceOutputInfo {
163    id: GlobalId,
164    output_index: usize,
165    resume_upper: Antichain<KafkaTimestamp>,
166    metadata_columns: Vec<KafkaMetadataKind>,
167}
168
169impl SourceRender for KafkaSourceConnection {
170    // TODO(petrosagg): The type used for the partition (RangeBound<PartitionId>) doesn't need to
171    // be so complicated and we could instead use `Partitioned<PartitionId, Option<u64>>` where all
172    // ranges are inclusive and a time of `None` signifies that a particular partition is not
173    // present. This requires an shard migration of the remap shard.
174    type Time = KafkaTimestamp;
175
176    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
177
178    fn render<G: Scope<Timestamp = KafkaTimestamp>>(
179        self,
180        scope: &mut G,
181        config: &RawSourceCreationConfig,
182        resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
183        start_signal: impl std::future::Future<Output = ()> + 'static,
184    ) -> (
185        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
186        StreamVec<G, HealthStatusMessage>,
187        StreamVec<G, Probe<KafkaTimestamp>>,
188        Vec<PressOnDropButton>,
189    ) {
190        let (metadata, probes, metadata_token) =
191            render_metadata_fetcher(scope, self.clone(), config.clone());
192        let (data, health, reader_token) = render_reader(
193            scope,
194            self,
195            config.clone(),
196            resume_uppers,
197            metadata,
198            start_signal,
199        );
200
201        let partition_count = u64::cast_from(config.source_exports.len());
202        let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
203            partition_count,
204            |((output, data), time, diff)| {
205                let output = u64::cast_from(*output);
206                (output, (data.clone(), time.clone(), diff.clone()))
207            },
208        );
209        let mut data_collections = BTreeMap::new();
210        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
211            data_collections.insert(*id, data_stream.as_collection());
212        }
213
214        (
215            data_collections,
216            health,
217            probes,
218            vec![metadata_token, reader_token],
219        )
220    }
221}
222
223/// Render the reader of a Kafka source.
224///
225/// The reader is responsible for polling the Kafka topic partitions for new messages, and
226/// transforming them into a `SourceMessage` collection.
227fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
228    scope: &G,
229    connection: KafkaSourceConnection,
230    config: RawSourceCreationConfig,
231    resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
232    metadata_stream: StreamVec<G, (mz_repr::Timestamp, MetadataUpdate)>,
233    start_signal: impl std::future::Future<Output = ()> + 'static,
234) -> (
235    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
236    StreamVec<G, HealthStatusMessage>,
237    PressOnDropButton,
238) {
239    let name = format!("KafkaReader({})", config.id);
240    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
241
242    let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
243    let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
244
245    let mut metadata_input = builder.new_disconnected_input(metadata_stream.broadcast(), Pipeline);
246
247    let mut outputs = vec![];
248
249    // Contains the `SourceStatistics` entries for exports that require a snapshot.
250    let mut all_export_stats = vec![];
251    let mut snapshot_export_stats = vec![];
252    for (idx, (id, export)) in config.source_exports.iter().enumerate() {
253        let SourceExport {
254            details,
255            storage_metadata: _,
256            data_config: _,
257        } = export;
258        let resume_upper = Antichain::from_iter(
259            config
260                .source_resume_uppers
261                .get(id)
262                .expect("all source exports must be present in source resume uppers")
263                .iter()
264                .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
265        );
266
267        let metadata_columns = match details {
268            SourceExportDetails::Kafka(details) => details
269                .metadata_columns
270                .iter()
271                .map(|(_name, kind)| kind.clone())
272                .collect::<Vec<_>>(),
273            _ => panic!("unexpected source export details: {:?}", details),
274        };
275
276        let statistics = config
277            .statistics
278            .get(id)
279            .expect("statistics have been initialized")
280            .clone();
281        // export requires snapshot
282        if resume_upper.as_ref() == &[Partitioned::minimum()] {
283            snapshot_export_stats.push(statistics.clone());
284        }
285        all_export_stats.push(statistics);
286
287        let output = SourceOutputInfo {
288            id: *id,
289            resume_upper,
290            output_index: idx,
291            metadata_columns,
292        };
293        outputs.push(output);
294    }
295
296    let busy_signal = Arc::clone(&config.busy_signal);
297    let button = builder.build(move |caps| {
298        SignaledFuture::new(busy_signal, async move {
299            let [mut data_cap, health_cap] = caps.try_into().unwrap();
300
301            let client_id = connection.client_id(
302                config.config.config_set(),
303                &config.config.connection_context,
304                config.id,
305            );
306            let group_id = connection.group_id(&config.config.connection_context, config.id);
307            let KafkaSourceConnection {
308                connection,
309                topic,
310                topic_metadata_refresh_interval,
311                start_offsets,
312                metadata_columns: _,
313                // Exhaustive match protects against forgetting to apply an
314                // option. Ignored fields are justified below.
315                connection_id: _,   // not needed here
316                group_id_prefix: _, // used above via `connection.group_id`
317            } = connection;
318
319            // Start offsets is a map from partition to the next offset to read from.
320            let mut start_offsets: BTreeMap<_, i64> = start_offsets
321                .clone()
322                .into_iter()
323                .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
324                .map(|(k, v)| (k, v))
325                .collect();
326
327            let mut partition_capabilities = BTreeMap::new();
328            let mut max_pid = None;
329            let resume_upper = Antichain::from_iter(
330                outputs
331                    .iter()
332                    .map(|output| output.resume_upper.clone())
333                    .flatten(),
334            );
335
336            for ts in resume_upper.elements() {
337                if let Some(pid) = ts.interval().singleton() {
338                    let pid = pid.unwrap_exact();
339                    max_pid = std::cmp::max(max_pid, Some(*pid));
340                    if responsible_for_pid(&config, *pid) {
341                        let restored_offset = i64::try_from(ts.timestamp().offset)
342                            .expect("restored kafka offsets must fit into i64");
343                        if let Some(start_offset) = start_offsets.get_mut(pid) {
344                            *start_offset = std::cmp::max(restored_offset, *start_offset);
345                        } else {
346                            start_offsets.insert(*pid, restored_offset);
347                        }
348
349                        let part_ts = Partitioned::new_singleton(
350                            RangeBound::exact(*pid),
351                            ts.timestamp().clone(),
352                        );
353                        let part_cap = PartitionCapability {
354                            data: data_cap.delayed(&part_ts),
355                        };
356                        partition_capabilities.insert(*pid, part_cap);
357                    }
358                }
359            }
360            let lower = max_pid
361                .map(RangeBound::after)
362                .unwrap_or(RangeBound::NegInfinity);
363            let future_ts =
364                Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
365            data_cap.downgrade(&future_ts);
366
367            info!(
368                source_id = config.id.to_string(),
369                worker_id = config.worker_id,
370                num_workers = config.worker_count,
371                "instantiating Kafka source reader at offsets {start_offsets:?}"
372            );
373
374            let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
375            let notificator = Arc::new(Notify::new());
376
377            let consumer: Result<BaseConsumer<_>, _> = connection
378                .create_with_context(
379                    &config.config,
380                    GlueConsumerContext {
381                        notificator: Arc::clone(&notificator),
382                        stats_tx,
383                        inner: MzClientContext::default(),
384                    },
385                    &btreemap! {
386                        // Disable Kafka auto commit. We manually commit offsets
387                        // to Kafka once we have reclocked those offsets, so
388                        // that users can use standard Kafka tools for progress
389                        // tracking.
390                        "enable.auto.commit" => "false".into(),
391                        // Always begin ingest at 0 when restarted, even if Kafka
392                        // contains committed consumer read offsets
393                        "auto.offset.reset" => "earliest".into(),
394                        // Use the user-configured topic metadata refresh
395                        // interval.
396                        "topic.metadata.refresh.interval.ms" =>
397                            topic_metadata_refresh_interval
398                            .as_millis()
399                            .to_string(),
400                        // TODO: document the rationale for this.
401                        "fetch.message.max.bytes" => "134217728".into(),
402                        // Consumer group ID, which may have been overridden by
403                        // the user. librdkafka requires this, and we use offset
404                        // committing to provide a way for users to monitor
405                        // ingest progress, though we do not rely on the
406                        // committed offsets for any functionality.
407                        "group.id" => group_id.clone(),
408                        // Allow Kafka monitoring tools to identify this
409                        // consumer.
410                        "client.id" => client_id.clone(),
411                    },
412                    InTask::Yes,
413                )
414                .await;
415
416            let consumer = match consumer {
417                Ok(consumer) => Arc::new(consumer),
418                Err(e) => {
419                    let update = HealthStatusUpdate::halting(
420                        format!(
421                            "failed creating kafka reader consumer: {}",
422                            e.display_with_causes()
423                        ),
424                        None,
425                    );
426                    health_output.give(
427                        &health_cap,
428                        HealthStatusMessage {
429                            id: None,
430                            namespace: if matches!(e, ContextCreationError::Ssh(_)) {
431                                StatusNamespace::Ssh
432                            } else {
433                                StatusNamespace::Kafka
434                            },
435                            update: update.clone(),
436                        },
437                    );
438                    for (output, update) in outputs.iter().repeat_clone(update) {
439                        health_output.give(
440                            &health_cap,
441                            HealthStatusMessage {
442                                id: Some(output.id),
443                                namespace: if matches!(e, ContextCreationError::Ssh(_)) {
444                                    StatusNamespace::Ssh
445                                } else {
446                                    StatusNamespace::Kafka
447                                },
448                                update,
449                            },
450                        );
451                    }
452                    // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
453                    // Returning would incorrectly present to the remap operator as progress to the
454                    // empty frontier which would be incorrectly recorded to the remap shard.
455                    std::future::pending::<()>().await;
456                    unreachable!("pending future never returns");
457                }
458            };
459
460            // Note that we wait for this AFTER we downgrade to the source `resume_upper`. This
461            // allows downstream operators (namely, the `reclock_operator`) to downgrade to the
462            // `resume_upper`, which is necessary for this basic form of backpressure to work.
463            start_signal.await;
464            info!(
465                source_id = config.id.to_string(),
466                worker_id = config.worker_id,
467                num_workers = config.worker_count,
468                "kafka worker noticed rehydration is finished, starting partition queues..."
469            );
470
471            let partition_ids = start_offsets.keys().copied().collect();
472            let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
473
474            let mut reader = KafkaSourceReader {
475                topic_name: topic.clone(),
476                source_name: config.name.clone(),
477                id: config.id,
478                partition_consumers: Vec::new(),
479                consumer: Arc::clone(&consumer),
480                worker_id: config.worker_id,
481                worker_count: config.worker_count,
482                last_offsets: outputs
483                    .iter()
484                    .map(|output| (output.output_index, BTreeMap::new()))
485                    .collect(),
486                start_offsets,
487                stats_rx,
488                partition_metrics: config.metrics.get_kafka_source_metrics(
489                    partition_ids,
490                    topic.clone(),
491                    config.id,
492                ),
493                partition_capabilities,
494            };
495
496            let offset_committer = KafkaResumeUpperProcessor {
497                config: config.clone(),
498                topic_name: topic.clone(),
499                consumer,
500                statistics: all_export_stats.clone(),
501            };
502
503            // Seed the progress metrics with `0` if we are snapshotting.
504            if !snapshot_export_stats.is_empty() {
505                if let Err(e) = offset_committer
506                    .process_frontier(resume_upper.clone())
507                    .await
508                {
509                    offset_commit_metrics.offset_commit_failures.inc();
510                    tracing::warn!(
511                        %e,
512                        "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
513                        worker_id = config.worker_id,
514                        source_id = config.id,
515                        upper = resume_upper.pretty()
516                    );
517                }
518                // Reset snapshot statistics for any exports that are not involved
519                // in this round of snapshotting. Those that are snapshotting this round will
520                // see updates as the snapshot commences.
521                for statistics in config.statistics.values() {
522                    statistics.set_snapshot_records_known(0);
523                    statistics.set_snapshot_records_staged(0);
524                }
525            }
526
527            let resume_uppers_process_loop = async move {
528                tokio::pin!(resume_uppers);
529                while let Some(frontier) = resume_uppers.next().await {
530                    if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
531                        offset_commit_metrics.offset_commit_failures.inc();
532                        tracing::warn!(
533                            %e,
534                            "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
535                            worker_id = config.worker_id,
536                            source_id = config.id,
537                            upper = frontier.pretty()
538                        );
539                    }
540                }
541                // During dataflow shutdown this loop can end due to the general chaos caused by
542                // dropping tokens as a means to shutdown. This call ensures this future never ends
543                // and we instead rely on this operator being dropped altogether when *its* token
544                // is dropped.
545                std::future::pending::<()>().await;
546            };
547            tokio::pin!(resume_uppers_process_loop);
548
549            let mut metadata_update: Option<MetadataUpdate> = None;
550            let mut snapshot_total = None;
551
552            let max_wait_time =
553                mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
554            loop {
555                // Wait for data or metadata events while also making progress with offset
556                // committing.
557                tokio::select! {
558                    // TODO(petrosagg): remove the timeout and rely purely on librdkafka waking us
559                    // up
560                    _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
561
562                    _ = metadata_input.ready() => {
563                        // Collect all pending updates, then only keep the most recent one.
564                        let mut updates = Vec::new();
565                        while let Some(event) = metadata_input.next_sync() {
566                            if let Event::Data(_, mut data) = event {
567                                updates.append(&mut data);
568                            }
569                        }
570                        metadata_update = updates
571                            .into_iter()
572                            .max_by_key(|(ts, _)| *ts)
573                            .map(|(_, update)| update);
574                    }
575
576                    // This future is not cancel safe but we are only passing a reference to it in
577                    // the select! loop so the future stays on the stack and never gets cancelled
578                    // until the end of the function.
579                    _ = resume_uppers_process_loop.as_mut() => {},
580                }
581
582                match metadata_update.take() {
583                    Some(MetadataUpdate::Partitions(partitions)) => {
584                        let max_pid = partitions.keys().last().cloned();
585                        let lower = max_pid
586                            .map(RangeBound::after)
587                            .unwrap_or(RangeBound::NegInfinity);
588                        let future_ts = Partitioned::new_range(
589                            lower,
590                            RangeBound::PosInfinity,
591                            MzOffset::from(0),
592                        );
593
594                        let mut offset_known = 0;
595                        for (&pid, &high_watermark) in &partitions {
596                            if responsible_for_pid(&config, pid) {
597                                offset_known += high_watermark;
598                                reader.ensure_partition(pid);
599                                if let Entry::Vacant(entry) =
600                                    reader.partition_capabilities.entry(pid)
601                                {
602                                    let start_offset = match reader.start_offsets.get(&pid) {
603                                        Some(&offset) => offset.try_into().unwrap(),
604                                        None => 0u64,
605                                    };
606                                    let part_since_ts = Partitioned::new_singleton(
607                                        RangeBound::exact(pid),
608                                        MzOffset::from(start_offset),
609                                    );
610
611                                    entry.insert(PartitionCapability {
612                                        data: data_cap.delayed(&part_since_ts),
613                                    });
614                                }
615                            }
616                        }
617
618                        // If we are snapshotting, record our first set of partitions as the snapshot
619                        // size.
620                        if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
621                            // Note that we want to represent the _number of offsets_, which
622                            // means the watermark's frontier semantics is correct, without
623                            // subtracting (Kafka offsets start at 0).
624                            snapshot_total = Some(offset_known);
625                        }
626
627                        // Clear all the health namespaces we know about.
628                        // Note that many kafka sources's don't have an ssh tunnel, but the
629                        // `health_operator` handles this fine.
630                        for output in &outputs {
631                            for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
632                                health_output.give(
633                                    &health_cap,
634                                    HealthStatusMessage {
635                                        id: Some(output.id),
636                                        namespace,
637                                        update: HealthStatusUpdate::running(),
638                                    },
639                                );
640                            }
641                        }
642                        for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
643                            health_output.give(
644                                &health_cap,
645                                HealthStatusMessage {
646                                    id: None,
647                                    namespace,
648                                    update: HealthStatusUpdate::running(),
649                                },
650                            );
651                        }
652
653                        for export_stat in all_export_stats.iter() {
654                            export_stat.set_offset_known(offset_known);
655                        }
656
657                        data_cap.downgrade(&future_ts);
658                    }
659                    Some(MetadataUpdate::TransientError(status)) => {
660                        if let Some(update) = status.kafka {
661                            health_output.give(
662                                &health_cap,
663                                HealthStatusMessage {
664                                    id: None,
665                                    namespace: StatusNamespace::Kafka,
666                                    update: update.clone(),
667                                },
668                            );
669                            for (output, update) in outputs.iter().repeat_clone(update) {
670                                health_output.give(
671                                    &health_cap,
672                                    HealthStatusMessage {
673                                        id: Some(output.id),
674                                        namespace: StatusNamespace::Kafka,
675                                        update,
676                                    },
677                                );
678                            }
679                        }
680                        if let Some(update) = status.ssh {
681                            health_output.give(
682                                &health_cap,
683                                HealthStatusMessage {
684                                    id: None,
685                                    namespace: StatusNamespace::Ssh,
686                                    update: update.clone(),
687                                },
688                            );
689                            for (output, update) in outputs.iter().repeat_clone(update) {
690                                health_output.give(
691                                    &health_cap,
692                                    HealthStatusMessage {
693                                        id: Some(output.id),
694                                        namespace: StatusNamespace::Ssh,
695                                        update,
696                                    },
697                                );
698                            }
699                        }
700                    }
701                    Some(MetadataUpdate::DefiniteError(error)) => {
702                        health_output.give(
703                            &health_cap,
704                            HealthStatusMessage {
705                                id: None,
706                                namespace: StatusNamespace::Kafka,
707                                update: HealthStatusUpdate::stalled(
708                                    error.to_string(),
709                                    None,
710                                ),
711                            },
712                        );
713                        let error = Err(error.into());
714                        let time = data_cap.time().clone();
715                        for (output, error) in
716                            outputs.iter().map(|o| o.output_index).repeat_clone(error)
717                        {
718                            data_output
719                                .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
720                                .await;
721                        }
722
723                        return;
724                    }
725                    None => {}
726                }
727
728                // Poll the consumer once. We split the consumer's partitions out into separate
729                // queues and poll those individually, but it's still necessary to drive logic that
730                // consumes from rdkafka's internal event queue, such as statistics callbacks.
731                //
732                // Additionally, assigning topics and splitting them off into separate queues is
733                // not atomic, so we expect to see at least some messages to show up when polling
734                // the consumer directly.
735                while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
736                    match result {
737                        Err(e) => {
738                            let error = format!(
739                                "kafka error when polling consumer for source: {} topic: {} : {}",
740                                reader.source_name, reader.topic_name, e
741                            );
742                            let status = HealthStatusUpdate::stalled(error, None);
743                            health_output.give(
744                                &health_cap,
745                                HealthStatusMessage {
746                                    id: None,
747                                    namespace: StatusNamespace::Kafka,
748                                    update: status.clone(),
749                                },
750                            );
751                            for (output, status) in outputs.iter().repeat_clone(status) {
752                                health_output.give(
753                                    &health_cap,
754                                    HealthStatusMessage {
755                                        id: Some(output.id),
756                                        namespace: StatusNamespace::Kafka,
757                                        update: status,
758                                    },
759                                );
760                            }
761                        }
762                        Ok(message) => {
763                            let output_messages = outputs
764                                .iter()
765                                .map(|output| {
766                                    let (message, ts) = construct_source_message(
767                                        &message,
768                                        &output.metadata_columns,
769                                    );
770                                    (output.output_index, message, ts)
771                                })
772                                // This vec allocation is required to allow obtaining a `&mut`
773                                // on `reader` for the `reader.handle_message` call in the
774                                // loop below since  `message` is borrowed from `reader`.
775                                .collect::<Vec<_>>();
776                            for (output_index, message, ts) in output_messages {
777                                if let Some((msg, time, diff)) =
778                                    reader.handle_message(message, ts, &output_index)
779                                {
780                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
781                                    let part_cap = &reader.partition_capabilities[pid].data;
782                                    let msg = msg.map_err(|e| {
783                                        DataflowError::SourceError(Box::new(SourceError {
784                                            error: SourceErrorDetails::Other(e.to_string().into()),
785                                        }))
786                                    });
787                                    data_output
788                                        .give_fueled(part_cap, ((output_index, msg), time, diff))
789                                        .await;
790                                }
791                            }
792                        }
793                    }
794                }
795
796                reader.update_stats();
797
798                // Take the consumers temporarily to get around borrow checker errors
799                let mut consumers = std::mem::take(&mut reader.partition_consumers);
800                for consumer in consumers.iter_mut() {
801                    let pid = consumer.pid();
802                    // We want to make sure the rest of the actions in the outer loops get
803                    // a chance to run. If rdkafka keeps pumping data at us we might find
804                    // ourselves in a situation where we keep dumping data into the
805                    // dataflow without signaling progress. For this reason we consume at most
806                    // 10k messages from each partition and go around the loop.
807                    let mut partition_exhausted = false;
808                    for _ in 0..10_000 {
809                        let Some(message) = consumer.get_next_message().transpose() else {
810                            partition_exhausted = true;
811                            break;
812                        };
813
814                        for output in outputs.iter() {
815                            let message = match &message {
816                                Ok((msg, pid)) => {
817                                    let (msg, ts) =
818                                        construct_source_message(msg, &output.metadata_columns);
819                                    assert_eq!(*pid, ts.0);
820                                    Ok(reader.handle_message(msg, ts, &output.output_index))
821                                }
822                                Err(err) => Err(err),
823                            };
824                            match message {
825                                Ok(Some((msg, time, diff))) => {
826                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
827                                    let part_cap = &reader.partition_capabilities[pid].data;
828                                    let msg = msg.map_err(|e| {
829                                        DataflowError::SourceError(Box::new(SourceError {
830                                            error: SourceErrorDetails::Other(e.to_string().into()),
831                                        }))
832                                    });
833                                    data_output
834                                        .give_fueled(
835                                            part_cap,
836                                            ((output.output_index, msg), time, diff),
837                                        )
838                                        .await;
839                                }
840                                // The message was from an offset we've already seen.
841                                Ok(None) => continue,
842                                Err(err) => {
843                                    let last_offset = reader
844                                        .last_offsets
845                                        .get(&output.output_index)
846                                        .expect("output known to be installed")
847                                        .get(&pid)
848                                        .expect("partition known to be installed");
849
850                                    let status = HealthStatusUpdate::stalled(
851                                        format!(
852                                            "error consuming from source: {} topic: {topic}:\
853                                             partition: {pid} last processed offset:\
854                                             {last_offset} : {err}",
855                                            config.name
856                                        ),
857                                        None,
858                                    );
859                                    health_output.give(
860                                        &health_cap,
861                                        HealthStatusMessage {
862                                            id: None,
863                                            namespace: StatusNamespace::Kafka,
864                                            update: status.clone(),
865                                        },
866                                    );
867                                    health_output.give(
868                                        &health_cap,
869                                        HealthStatusMessage {
870                                            id: Some(output.id),
871                                            namespace: StatusNamespace::Kafka,
872                                            update: status,
873                                        },
874                                    );
875                                }
876                            }
877                        }
878                    }
879                    if !partition_exhausted {
880                        notificator.notify_one();
881                    }
882                }
883                // We can now put them back
884                assert!(reader.partition_consumers.is_empty());
885                reader.partition_consumers = consumers;
886
887                let positions = reader.consumer.position().unwrap();
888                let topic_positions = positions.elements_for_topic(&reader.topic_name);
889                let mut snapshot_staged = 0;
890
891                for position in topic_positions {
892                    // The offset begins in the `Offset::Invalid` state in which case we simply
893                    // skip this partition.
894                    if let Offset::Offset(offset) = position.offset() {
895                        let pid = position.partition();
896                        let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
897                        let upper =
898                            Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
899
900                        let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
901                        match part_cap.data.try_downgrade(&upper) {
902                            Ok(()) => {
903                                if !snapshot_export_stats.is_empty() {
904                                    // The `.position()` of the consumer represents what offset we have
905                                    // read up to.
906                                    snapshot_staged += offset.try_into().unwrap_or(0u64);
907                                    // This will always be `Some` at this point.
908                                    if let Some(snapshot_total) = snapshot_total {
909                                        // We will eventually read past the snapshot total, so we need
910                                        // to bound it here.
911                                        snapshot_staged =
912                                            std::cmp::min(snapshot_staged, snapshot_total);
913                                    }
914                                }
915                            }
916                            Err(_) => {
917                                // If we can't downgrade, it means we have already seen this offset.
918                                // This is expected and we can safely ignore it.
919                                info!(
920                                    source_id = config.id.to_string(),
921                                    worker_id = config.worker_id,
922                                    num_workers = config.worker_count,
923                                    "kafka source frontier downgrade skipped due to already \
924                                     seen offset: {:?}",
925                                    upper
926                                );
927                            }
928                        };
929
930                    }
931                }
932
933                if let (Some(snapshot_total), true) =
934                    (snapshot_total, !snapshot_export_stats.is_empty())
935                {
936                    for export_stat in snapshot_export_stats.iter() {
937                        export_stat.set_snapshot_records_known(snapshot_total);
938                        export_stat.set_snapshot_records_staged(snapshot_staged);
939                    }
940                    if snapshot_total == snapshot_staged {
941                        snapshot_export_stats.clear();
942                    }
943                }
944            }
945        })
946    });
947
948    (
949        stream.as_collection(),
950        health_stream,
951        button.press_on_drop(),
952    )
953}
954
955impl KafkaResumeUpperProcessor {
956    async fn process_frontier(
957        &self,
958        frontier: Antichain<KafkaTimestamp>,
959    ) -> Result<(), anyhow::Error> {
960        use rdkafka::consumer::CommitMode;
961
962        // Generate a list of partitions that this worker is responsible for
963        let mut offsets = vec![];
964        let mut offset_committed = 0;
965        for ts in frontier.iter() {
966            if let Some(pid) = ts.interval().singleton() {
967                let pid = pid.unwrap_exact();
968                if responsible_for_pid(&self.config, *pid) {
969                    offsets.push((pid.clone(), *ts.timestamp()));
970
971                    // Note that we do not subtract 1 from the frontier. Imagine
972                    // that frontier is 2 for this pid. That means we have
973                    // full processed offset 0 and offset 1, which means we have
974                    // processed _2_ offsets.
975                    offset_committed += ts.timestamp().offset;
976                }
977            }
978        }
979
980        for export_stat in self.statistics.iter() {
981            export_stat.set_offset_committed(offset_committed);
982        }
983
984        if !offsets.is_empty() {
985            let mut tpl = TopicPartitionList::new();
986            for (pid, offset) in offsets {
987                let offset_to_commit =
988                    Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
989                tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
990                    .expect("offset known to be valid");
991            }
992            let consumer = Arc::clone(&self.consumer);
993            mz_ore::task::spawn_blocking(
994                || format!("source({}) kafka offset commit", self.config.id),
995                move || consumer.commit(&tpl, CommitMode::Sync),
996            )
997            .await?;
998        }
999        Ok(())
1000    }
1001}
1002
1003impl KafkaSourceReader {
1004    /// Ensures that a partition queue for `pid` exists.
1005    fn ensure_partition(&mut self, pid: PartitionId) {
1006        if self.last_offsets.is_empty() {
1007            tracing::info!(
1008                source_id = %self.id,
1009                worker_id = %self.worker_id,
1010                "kafka source does not have any outputs, not creating partition queue");
1011
1012            return;
1013        }
1014        for last_offsets in self.last_offsets.values() {
1015            // early exit if we've already inserted this partition
1016            if last_offsets.contains_key(&pid) {
1017                return;
1018            }
1019        }
1020
1021        let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1022        self.create_partition_queue(pid, Offset::Offset(start_offset));
1023
1024        for last_offsets in self.last_offsets.values_mut() {
1025            let prev = last_offsets.insert(pid, start_offset - 1);
1026            assert_none!(prev);
1027        }
1028    }
1029
1030    /// Creates a new partition queue for `partition_id`.
1031    fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1032        info!(
1033            source_id = self.id.to_string(),
1034            worker_id = self.worker_id,
1035            num_workers = self.worker_count,
1036            "activating Kafka queue for topic {}, partition {}",
1037            self.topic_name,
1038            partition_id,
1039        );
1040
1041        // Collect old partition assignments
1042        let tpl = self.consumer.assignment().unwrap();
1043        // Create list from assignments
1044        let mut partition_list = TopicPartitionList::new();
1045        for partition in tpl.elements_for_topic(&self.topic_name) {
1046            partition_list
1047                .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1048                .expect("offset known to be valid");
1049        }
1050        // Add new partition
1051        partition_list
1052            .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1053            .expect("offset known to be valid");
1054        self.consumer
1055            .assign(&partition_list)
1056            .expect("assignment known to be valid");
1057
1058        // Since librdkafka v1.6.0, we need to recreate all partition queues
1059        // after every call to `self.consumer.assign`.
1060        let context = Arc::clone(self.consumer.context());
1061        for pc in &mut self.partition_consumers {
1062            pc.partition_queue = self
1063                .consumer
1064                .split_partition_queue(&self.topic_name, pc.pid)
1065                .expect("partition known to be valid");
1066            pc.partition_queue.set_nonempty_callback({
1067                let context = Arc::clone(&context);
1068                move || context.inner().activate()
1069            });
1070        }
1071
1072        let mut partition_queue = self
1073            .consumer
1074            .split_partition_queue(&self.topic_name, partition_id)
1075            .expect("partition known to be valid");
1076        partition_queue.set_nonempty_callback(move || context.inner().activate());
1077        self.partition_consumers
1078            .push(PartitionConsumer::new(partition_id, partition_queue));
1079        assert_eq!(
1080            self.consumer
1081                .assignment()
1082                .unwrap()
1083                .elements_for_topic(&self.topic_name)
1084                .len(),
1085            self.partition_consumers.len()
1086        );
1087    }
1088
1089    /// Read any statistics JSON blobs generated via the rdkafka statistics callback.
1090    fn update_stats(&mut self) {
1091        while let Ok(stats) = self.stats_rx.try_recv() {
1092            match serde_json::from_str::<Statistics>(&stats.to_string()) {
1093                Ok(statistics) => {
1094                    let topic = statistics.topics.get(&self.topic_name);
1095                    match topic {
1096                        Some(topic) => {
1097                            for (id, partition) in &topic.partitions {
1098                                self.partition_metrics
1099                                    .set_offset_max(*id, partition.hi_offset);
1100                            }
1101                        }
1102                        None => error!("No stats found for topic: {}", &self.topic_name),
1103                    }
1104                }
1105                Err(e) => {
1106                    error!("failed decoding librdkafka statistics JSON: {}", e);
1107                }
1108            }
1109        }
1110    }
1111
1112    /// Checks if the given message is viable for emission. This checks if the message offset is
1113    /// past the expected offset and returns None if it is not.
1114    fn handle_message(
1115        &mut self,
1116        message: Result<SourceMessage, KafkaHeaderParseError>,
1117        (partition, offset): (PartitionId, MzOffset),
1118        output_index: &usize,
1119    ) -> Option<(
1120        Result<SourceMessage, KafkaHeaderParseError>,
1121        KafkaTimestamp,
1122        Diff,
1123    )> {
1124        // Offsets are guaranteed to be 1) monotonically increasing *unless* there is
1125        // a network issue or a new partition added, at which point the consumer may
1126        // start processing the topic from the beginning, or we may see duplicate offsets
1127        // At all times, the guarantee : if we see offset x, we have seen all offsets [0,x-1]
1128        // that we are ever going to see holds.
1129        // Offsets are guaranteed to be contiguous when compaction is disabled. If compaction
1130        // is enabled, there may be gaps in the sequence.
1131        // If we see an "old" offset, we skip that message.
1132
1133        // Given the explicit consumer to partition assignment, we should never receive a message
1134        // for a partition for which we have no metadata
1135        assert!(
1136            self.last_offsets
1137                .get(output_index)
1138                .unwrap()
1139                .contains_key(&partition)
1140        );
1141
1142        let last_offset_ref = self
1143            .last_offsets
1144            .get_mut(output_index)
1145            .expect("output known to be installed")
1146            .get_mut(&partition)
1147            .expect("partition known to be installed");
1148
1149        let last_offset = *last_offset_ref;
1150        let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1151        if offset_as_i64 <= last_offset {
1152            info!(
1153                source_id = self.id.to_string(),
1154                worker_id = self.worker_id,
1155                num_workers = self.worker_count,
1156                "kafka message before expected offset: \
1157                 source {} (reading topic {}, partition {}, output {}) \
1158                 received offset {} expected offset {:?}",
1159                self.source_name,
1160                self.topic_name,
1161                partition,
1162                output_index,
1163                offset.offset,
1164                last_offset + 1,
1165            );
1166            // We explicitly should not consume the message as we have already processed it.
1167            None
1168        } else {
1169            *last_offset_ref = offset_as_i64;
1170
1171            let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1172            Some((message, ts, Diff::ONE))
1173        }
1174    }
1175}
1176
1177fn construct_source_message(
1178    msg: &BorrowedMessage<'_>,
1179    metadata_columns: &[KafkaMetadataKind],
1180) -> (
1181    Result<SourceMessage, KafkaHeaderParseError>,
1182    (PartitionId, MzOffset),
1183) {
1184    let pid = msg.partition();
1185    let Ok(offset) = u64::try_from(msg.offset()) else {
1186        panic!(
1187            "got negative offset ({}) from otherwise non-error'd kafka message",
1188            msg.offset()
1189        );
1190    };
1191
1192    let mut metadata = Row::default();
1193    let mut packer = metadata.packer();
1194    for kind in metadata_columns {
1195        match kind {
1196            KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1197            KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1198            KafkaMetadataKind::Timestamp => {
1199                let ts = msg
1200                    .timestamp()
1201                    .to_millis()
1202                    .expect("kafka sources always have upstream_time");
1203
1204                let d: Datum = DateTime::from_timestamp_millis(ts)
1205                    .and_then(|dt| {
1206                        let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1207                            dt.naive_utc().try_into().ok();
1208                        ct
1209                    })
1210                    .into();
1211                packer.push(d)
1212            }
1213            KafkaMetadataKind::Header { key, use_bytes } => {
1214                match msg.headers() {
1215                    Some(headers) => {
1216                        let d = headers
1217                            .iter()
1218                            .filter(|header| header.key == key)
1219                            .last()
1220                            .map(|header| match header.value {
1221                                Some(v) => {
1222                                    if *use_bytes {
1223                                        Ok(Datum::Bytes(v))
1224                                    } else {
1225                                        match str::from_utf8(v) {
1226                                            Ok(str) => Ok(Datum::String(str)),
1227                                            Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1228                                                key: key.clone(),
1229                                                raw: v.to_vec(),
1230                                            }),
1231                                        }
1232                                    }
1233                                }
1234                                None => Ok(Datum::Null),
1235                            })
1236                            .unwrap_or_else(|| {
1237                                Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1238                            });
1239                        match d {
1240                            Ok(d) => packer.push(d),
1241                            //abort with a definite error when the header is not found or cannot be parsed correctly
1242                            Err(err) => return (Err(err), (pid, offset.into())),
1243                        }
1244                    }
1245                    None => packer.push(Datum::Null),
1246                }
1247            }
1248            KafkaMetadataKind::Headers => {
1249                packer.push_list_with(|r| {
1250                    if let Some(headers) = msg.headers() {
1251                        for header in headers.iter() {
1252                            match header.value {
1253                                Some(v) => r.push_list_with(|record_row| {
1254                                    record_row.push(Datum::String(header.key));
1255                                    record_row.push(Datum::Bytes(v));
1256                                }),
1257                                None => r.push_list_with(|record_row| {
1258                                    record_row.push(Datum::String(header.key));
1259                                    record_row.push(Datum::Null);
1260                                }),
1261                            }
1262                        }
1263                    }
1264                });
1265            }
1266        }
1267    }
1268
1269    let key = match msg.key() {
1270        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1271        None => Row::pack([Datum::Null]),
1272    };
1273    let value = match msg.payload() {
1274        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1275        None => Row::pack([Datum::Null]),
1276    };
1277    (
1278        Ok(SourceMessage {
1279            key,
1280            value,
1281            metadata,
1282        }),
1283        (pid, offset.into()),
1284    )
1285}
1286
1287/// Wrapper around a partition containing the underlying consumer
1288struct PartitionConsumer {
1289    /// the partition id with which this consumer is associated
1290    pid: PartitionId,
1291    /// The underlying Kafka partition queue
1292    partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1293}
1294
1295impl PartitionConsumer {
1296    /// Creates a new partition consumer from underlying Kafka consumer
1297    fn new(
1298        pid: PartitionId,
1299        partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1300    ) -> Self {
1301        PartitionConsumer {
1302            pid,
1303            partition_queue,
1304        }
1305    }
1306
1307    /// Returns the next message to process for this partition (if any).
1308    ///
1309    /// The outer `Result` represents irrecoverable failures, the inner one can and will
1310    /// be transformed into empty values.
1311    ///
1312    /// The inner `Option` represents if there is a message to process.
1313    fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1314        match self.partition_queue.poll(Duration::from_millis(0)) {
1315            Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1316            Some(Err(err)) => Err(err),
1317            _ => Ok(None),
1318        }
1319    }
1320
1321    /// Return the partition id for this PartitionConsumer
1322    fn pid(&self) -> PartitionId {
1323        self.pid
1324    }
1325}
1326
1327/// An implementation of [`ConsumerContext`] that forwards statistics to the
1328/// worker
1329struct GlueConsumerContext {
1330    notificator: Arc<Notify>,
1331    stats_tx: crossbeam_channel::Sender<Jsonb>,
1332    inner: MzClientContext,
1333}
1334
1335impl ClientContext for GlueConsumerContext {
1336    fn stats_raw(&self, statistics: &[u8]) {
1337        match Jsonb::from_slice(statistics) {
1338            Ok(statistics) => {
1339                self.stats_tx
1340                    .send(statistics)
1341                    .expect("timely operator hung up while Kafka source active");
1342                self.activate();
1343            }
1344            Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1345        };
1346    }
1347
1348    // The shape of the rdkafka *Context traits require us to forward to the `MzClientContext`
1349    // implementation.
1350    fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1351        self.inner.log(level, fac, log_message)
1352    }
1353    fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1354        self.inner.error(error, reason)
1355    }
1356}
1357
1358impl GlueConsumerContext {
1359    fn activate(&self) {
1360        self.notificator.notify_one();
1361    }
1362}
1363
1364impl ConsumerContext for GlueConsumerContext {}
1365
1366#[cfg(test)]
1367mod tests {
1368    use std::sync::Arc;
1369    use std::time::Duration;
1370
1371    use mz_kafka_util::client::create_new_client_config_simple;
1372    use rdkafka::consumer::{BaseConsumer, Consumer};
1373    use rdkafka::{Message, Offset, TopicPartitionList};
1374    use uuid::Uuid;
1375
1376    // Splitting off a partition queue with an `Offset` that is not `Offset::Beginning` seems to
1377    // lead to a race condition where sometimes we receive messages from polling the main consumer
1378    // instead of on the partition queue. This can be surfaced by running the test in a loop (in
1379    // the dataflow directory) using:
1380    //
1381    // cargo stress --lib --release source::kafka::tests::reproduce_kafka_queue_issue
1382    //
1383    // cargo-stress can be installed via `cargo install cargo-stress`
1384    //
1385    // You need to set up a topic "queue-test" with 1000 "hello" messages in it. Obviously, running
1386    // this test requires a running Kafka instance at localhost:9092.
1387    #[mz_ore::test]
1388    #[ignore]
1389    fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1390        let topic_name = "queue-test";
1391        let pid = 0;
1392
1393        let mut kafka_config = create_new_client_config_simple();
1394        kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1395        kafka_config.set("enable.auto.commit", "false");
1396        kafka_config.set("group.id", Uuid::new_v4().to_string());
1397        kafka_config.set("fetch.message.max.bytes", "100");
1398        let consumer: BaseConsumer<_> = kafka_config.create()?;
1399
1400        let consumer = Arc::new(consumer);
1401
1402        let mut partition_list = TopicPartitionList::new();
1403        // Using Offset:Beginning here will work fine, only Offset:Offset(0) leads to the race
1404        // condition.
1405        partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1406
1407        consumer.assign(&partition_list)?;
1408
1409        let partition_queue = consumer
1410            .split_partition_queue(topic_name, pid)
1411            .expect("missing partition queue");
1412
1413        let expected_messages = 1_000;
1414
1415        let mut common_queue_count = 0;
1416        let mut partition_queue_count = 0;
1417
1418        loop {
1419            if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1420                match msg {
1421                    Ok(msg) => {
1422                        let _payload =
1423                            std::str::from_utf8(msg.payload().expect("missing payload"))?;
1424                        if partition_queue_count > 0 {
1425                            anyhow::bail!(
1426                                "Got message from common queue after we internally switched to partition queue."
1427                            );
1428                        }
1429
1430                        common_queue_count += 1;
1431                    }
1432                    Err(err) => anyhow::bail!("{}", err),
1433                }
1434            }
1435
1436            match partition_queue.poll(Duration::from_millis(0)) {
1437                Some(Ok(msg)) => {
1438                    let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1439                    partition_queue_count += 1;
1440                }
1441                Some(Err(err)) => anyhow::bail!("{}", err),
1442                _ => (),
1443            }
1444
1445            if (common_queue_count + partition_queue_count) == expected_messages {
1446                break;
1447            }
1448        }
1449
1450        assert!(
1451            common_queue_count == 0,
1452            "Got {} out of {} messages from common queue. Partition queue: {}",
1453            common_queue_count,
1454            expected_messages,
1455            partition_queue_count
1456        );
1457
1458        Ok(())
1459    }
1460}
1461
1462/// Fetches the list of partitions and their corresponding high watermark.
1463fn fetch_partition_info<C: ConsumerContext>(
1464    consumer: &BaseConsumer<C>,
1465    topic: &str,
1466    fetch_timeout: Duration,
1467) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1468    let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1469
1470    let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1471    for pid in pids {
1472        offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1473    }
1474
1475    let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1476
1477    let mut result = BTreeMap::new();
1478    for entry in offset_responses.elements() {
1479        let offset = match entry.offset() {
1480            Offset::Offset(offset) => offset,
1481            offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1482        };
1483
1484        let pid = entry.partition();
1485        let watermark = offset.try_into().expect("invalid negative offset");
1486        result.insert(pid, watermark);
1487    }
1488
1489    Ok(result)
1490}
1491
1492/// An update produced by the metadata fetcher.
1493#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1494enum MetadataUpdate {
1495    /// The current IDs and high watermarks of all topic partitions.
1496    Partitions(BTreeMap<PartitionId, HighWatermark>),
1497    /// A transient error.
1498    ///
1499    /// Transient errors stall the source until their cause has been resolved.
1500    TransientError(HealthStatus),
1501    /// A definite error.
1502    ///
1503    /// Definite errors cannot be recovered from. They poison the source until the end of time.
1504    DefiniteError(SourceError),
1505}
1506
1507impl MetadataUpdate {
1508    /// Return the upstream frontier resulting from the metadata update, if any.
1509    fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1510        match self {
1511            Self::Partitions(partitions) => {
1512                let max_pid = partitions.keys().last().copied();
1513                let lower = max_pid
1514                    .map(RangeBound::after)
1515                    .unwrap_or(RangeBound::NegInfinity);
1516                let future_ts =
1517                    Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1518
1519                let mut frontier = Antichain::from_elem(future_ts);
1520                for (pid, high_watermark) in partitions {
1521                    frontier.insert(Partitioned::new_singleton(
1522                        RangeBound::exact(*pid),
1523                        MzOffset::from(*high_watermark),
1524                    ));
1525                }
1526
1527                Some(frontier)
1528            }
1529            Self::DefiniteError(_) => Some(Antichain::new()),
1530            Self::TransientError(_) => None,
1531        }
1532    }
1533}
1534
1535#[derive(Debug, thiserror::Error)]
1536pub enum KafkaHeaderParseError {
1537    #[error("A header with key '{key}' was not found in the message headers")]
1538    KeyNotFound { key: String },
1539    #[error(
1540        "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1541    )]
1542    Utf8Error { key: String, raw: Vec<u8> },
1543}
1544
1545/// Render the metadata fetcher of a Kafka source.
1546///
1547/// The metadata fetcher is a single-worker operator that is responsible for periodically fetching
1548/// the Kafka topic metadata (partition IDs and high watermarks) and making it available as a
1549/// Timely stream.
1550fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
1551    scope: &G,
1552    connection: KafkaSourceConnection,
1553    config: RawSourceCreationConfig,
1554) -> (
1555    StreamVec<G, (mz_repr::Timestamp, MetadataUpdate)>,
1556    StreamVec<G, Probe<KafkaTimestamp>>,
1557    PressOnDropButton,
1558) {
1559    let active_worker_id = usize::cast_from(config.id.hashed());
1560    let is_active_worker = active_worker_id % scope.peers() == scope.index();
1561
1562    let resume_upper = Antichain::from_iter(
1563        config
1564            .source_resume_uppers
1565            .values()
1566            .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1567            .flatten(),
1568    );
1569
1570    let name = format!("KafkaMetadataFetcher({})", config.id);
1571    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1572
1573    let (metadata_output, metadata_stream) =
1574        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1575    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1576
1577    let button = builder.build(move |caps| async move {
1578        if !is_active_worker {
1579            return;
1580        }
1581
1582        let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1583
1584        let client_id = connection.client_id(
1585            config.config.config_set(),
1586            &config.config.connection_context,
1587            config.id,
1588        );
1589        let KafkaSourceConnection {
1590            connection,
1591            topic,
1592            topic_metadata_refresh_interval,
1593            ..
1594        } = connection;
1595
1596        let consumer: Result<BaseConsumer<_>, _> = connection
1597            .create_with_context(
1598                &config.config,
1599                MzClientContext::default(),
1600                &btreemap! {
1601                    // Use the user-configured topic metadata refresh
1602                    // interval.
1603                    "topic.metadata.refresh.interval.ms" =>
1604                        topic_metadata_refresh_interval
1605                        .as_millis()
1606                        .to_string(),
1607                    // Allow Kafka monitoring tools to identify this
1608                    // consumer.
1609                    "client.id" => format!("{client_id}-metadata"),
1610                },
1611                InTask::Yes,
1612            )
1613            .await;
1614
1615        let consumer = match consumer {
1616            Ok(consumer) => consumer,
1617            Err(e) => {
1618                let msg = format!(
1619                    "failed creating kafka metadata consumer: {}",
1620                    e.display_with_causes()
1621                );
1622                let status_update = HealthStatusUpdate::halting(msg, None);
1623                let status = match e {
1624                    ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1625                    _ => HealthStatus::kafka(status_update),
1626                };
1627                let error = MetadataUpdate::TransientError(status);
1628                let timestamp = (config.now_fn)().into();
1629                metadata_output.give(&metadata_cap, (timestamp, error));
1630
1631                // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
1632                // Returning would incorrectly present to the remap operator as progress to the
1633                // empty frontier which would be incorrectly recorded to the remap shard.
1634                std::future::pending::<()>().await;
1635                unreachable!("pending future never returns");
1636            }
1637        };
1638
1639        let (tx, mut rx) = mpsc::unbounded_channel();
1640        spawn_metadata_thread(config, consumer, topic, tx);
1641
1642        let mut prev_upstream_frontier = resume_upper;
1643
1644        while let Some((timestamp, mut update)) = rx.recv().await {
1645            if prev_upstream_frontier.is_empty() {
1646                return;
1647            }
1648
1649            if let Some(upstream_frontier) = update.upstream_frontier() {
1650                // Topics are identified by name but it's possible that a user recreates a topic
1651                // with the same name. Ideally we'd want to catch all of these cases and
1652                // immediately error out the source, since the data is effectively gone.
1653                // Unfortunately this is not possible without something like KIP-516.
1654                //
1655                // The best we can do is check whether the upstream frontier regressed. This tells
1656                // us that the topic was recreated and now contains fewer offsets and/or fewer
1657                // partitions. Note that we are not able to detect topic recreation if neither of
1658                // the two are true.
1659                if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1660                    let error = SourceError {
1661                        error: SourceErrorDetails::Other("topic was recreated".into()),
1662                    };
1663                    update = MetadataUpdate::DefiniteError(error);
1664                }
1665            }
1666
1667            if let Some(upstream_frontier) = update.upstream_frontier() {
1668                prev_upstream_frontier = upstream_frontier.clone();
1669
1670                let probe = Probe {
1671                    probe_ts: timestamp,
1672                    upstream_frontier,
1673                };
1674                probe_output.give(&probe_cap, probe);
1675            }
1676
1677            metadata_output.give(&metadata_cap, (timestamp, update));
1678        }
1679    });
1680
1681    (metadata_stream, probe_stream, button.press_on_drop())
1682}
1683
1684fn spawn_metadata_thread<C: ConsumerContext>(
1685    config: RawSourceCreationConfig,
1686    consumer: BaseConsumer<TunnelingClientContext<C>>,
1687    topic: String,
1688    tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1689) {
1690    // Linux thread names are limited to 15 characters. Use a truncated ID to fit the name.
1691    thread::Builder::new()
1692        .name(format!("kfk-mtdt-{}", config.id))
1693        .spawn(move || {
1694            trace!(
1695                source_id = config.id.to_string(),
1696                worker_id = config.worker_id,
1697                num_workers = config.worker_count,
1698                "kafka metadata thread: starting..."
1699            );
1700
1701            let timestamp_interval = config.timestamp_interval;
1702            let mut ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
1703
1704            loop {
1705                let probe_ts = ticker.tick_blocking();
1706                let result = fetch_partition_info(
1707                    &consumer,
1708                    &topic,
1709                    config
1710                        .config
1711                        .parameters
1712                        .kafka_timeout_config
1713                        .fetch_metadata_timeout,
1714                );
1715                trace!(
1716                    source_id = config.id.to_string(),
1717                    worker_id = config.worker_id,
1718                    num_workers = config.worker_count,
1719                    "kafka metadata thread: metadata fetch result: {:?}",
1720                    result
1721                );
1722                let update = match result {
1723                    Ok(partitions) => {
1724                        trace!(
1725                            source_id = config.id.to_string(),
1726                            worker_id = config.worker_id,
1727                            num_workers = config.worker_count,
1728                            "kafka metadata thread: fetched partition metadata info",
1729                        );
1730
1731                        MetadataUpdate::Partitions(partitions)
1732                    }
1733                    Err(GetPartitionsError::TopicDoesNotExist) => {
1734                        let error = SourceError {
1735                            error: SourceErrorDetails::Other("topic was deleted".into()),
1736                        };
1737                        MetadataUpdate::DefiniteError(error)
1738                    }
1739                    Err(e) => {
1740                        let kafka_status = Some(HealthStatusUpdate::stalled(
1741                            format!("{}", e.display_with_causes()),
1742                            None,
1743                        ));
1744
1745                        let ssh_status = consumer.client().context().tunnel_status();
1746                        let ssh_status = match ssh_status {
1747                            SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1748                            SshTunnelStatus::Errored(e) => {
1749                                Some(HealthStatusUpdate::stalled(e, None))
1750                            }
1751                        };
1752
1753                        MetadataUpdate::TransientError(HealthStatus {
1754                            kafka: kafka_status,
1755                            ssh: ssh_status,
1756                        })
1757                    }
1758                };
1759
1760                if tx.send((probe_ts, update)).is_err() {
1761                    break;
1762                }
1763            }
1764
1765            info!(
1766                source_id = config.id.to_string(),
1767                worker_id = config.worker_id,
1768                num_workers = config.worker_count,
1769                "kafka metadata thread: receiver has gone away; shutting down."
1770            )
1771        })
1772        .unwrap();
1773}