mz_storage/source/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::convert::Infallible;
13use std::str::{self};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use chrono::{DateTime, NaiveDateTime};
20use differential_dataflow::{AsCollection, Hashable};
21use futures::StreamExt;
22use itertools::Itertools;
23use maplit::btreemap;
24use mz_kafka_util::client::{
25    GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
26};
27use mz_ore::assert_none;
28use mz_ore::cast::CastFrom;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
34use mz_ssh_util::tunnel::SshTunnelStatus;
35use mz_storage_types::dyncfgs::KAFKA_METADATA_FETCH_INTERVAL;
36use mz_storage_types::errors::{
37    ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
38};
39use mz_storage_types::sources::kafka::{
40    KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
41};
42use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
43use mz_timely_util::antichain::AntichainExt;
44use mz_timely_util::builder_async::{
45    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::containers::stack::AccountedStackBuilder;
48use mz_timely_util::order::Partitioned;
49use rdkafka::consumer::base_consumer::PartitionQueue;
50use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
51use rdkafka::error::KafkaError;
52use rdkafka::message::{BorrowedMessage, Headers};
53use rdkafka::statistics::Statistics;
54use rdkafka::topic_partition_list::Offset;
55use rdkafka::{ClientContext, Message, TopicPartitionList};
56use serde::{Deserialize, Serialize};
57use timely::PartialOrder;
58use timely::container::CapacityContainerBuilder;
59use timely::dataflow::channels::pact::Pipeline;
60use timely::dataflow::operators::core::Partition;
61use timely::dataflow::operators::{Broadcast, Capability};
62use timely::dataflow::{Scope, Stream};
63use timely::progress::Antichain;
64use timely::progress::Timestamp;
65use tokio::sync::{Notify, mpsc};
66use tracing::{error, info, trace};
67
68use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
69use crate::metrics::source::kafka::KafkaSourceMetrics;
70use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
71use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
72use crate::statistics::SourceStatistics;
73
74#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
75struct HealthStatus {
76    kafka: Option<HealthStatusUpdate>,
77    ssh: Option<HealthStatusUpdate>,
78}
79
80impl HealthStatus {
81    fn kafka(update: HealthStatusUpdate) -> Self {
82        Self {
83            kafka: Some(update),
84            ssh: None,
85        }
86    }
87
88    fn ssh(update: HealthStatusUpdate) -> Self {
89        Self {
90            kafka: None,
91            ssh: Some(update),
92        }
93    }
94}
95
96/// Contains all information necessary to ingest data from Kafka
97pub struct KafkaSourceReader {
98    /// Name of the topic on which this source is backed on
99    topic_name: String,
100    /// Name of the source (will have format kafka-source-id)
101    source_name: String,
102    /// Source global ID
103    id: GlobalId,
104    /// Kafka consumer for this source
105    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
106    /// List of consumers. A consumer should be assigned per partition to guarantee fairness
107    partition_consumers: Vec<PartitionConsumer>,
108    /// Worker ID
109    worker_id: usize,
110    /// Total count of workers
111    worker_count: usize,
112    /// The most recently read offset for each partition known to this source
113    /// reader by output-index. An offset of -1 indicates that no prior message
114    /// has been read for the given partition.
115    last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
116    /// The offset to start reading from for each partition.
117    start_offsets: BTreeMap<PartitionId, i64>,
118    /// Channel to receive Kafka statistics JSON blobs from the stats callback.
119    stats_rx: crossbeam_channel::Receiver<Jsonb>,
120    /// A handle to the partition specific metrics
121    partition_metrics: KafkaSourceMetrics,
122    /// Per partition capabilities used to produce messages
123    partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
124}
125
126struct PartitionCapability {
127    /// The capability of the data produced
128    data: Capability<KafkaTimestamp>,
129    /// The capability of the progress stream
130    progress: Capability<KafkaTimestamp>,
131}
132
133/// The high watermark offsets of a Kafka partition.
134///
135/// This is the offset of the latest message in the topic/partition available for consumption + 1.
136type HighWatermark = u64;
137
138/// Processes `resume_uppers` stream updates, committing them upstream and
139/// storing them in the `progress_statistics` to be emitted later.
140pub struct KafkaResumeUpperProcessor {
141    config: RawSourceCreationConfig,
142    topic_name: String,
143    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
144    statistics: Vec<SourceStatistics>,
145}
146
147/// Computes whether this worker is responsible for consuming a partition. It assigns partitions to
148/// workers in a round-robin fashion, starting at an arbitrary worker based on the hash of the
149/// source id.
150fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
151    let pid = usize::try_from(pid).expect("positive pid");
152    ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
153}
154
155struct SourceOutputInfo {
156    id: GlobalId,
157    output_index: usize,
158    resume_upper: Antichain<KafkaTimestamp>,
159    metadata_columns: Vec<KafkaMetadataKind>,
160}
161
162impl SourceRender for KafkaSourceConnection {
163    // TODO(petrosagg): The type used for the partition (RangeBound<PartitionId>) doesn't need to
164    // be so complicated and we could instead use `Partitioned<PartitionId, Option<u64>>` where all
165    // ranges are inclusive and a time of `None` signifies that a particular partition is not
166    // present. This requires an shard migration of the remap shard.
167    type Time = KafkaTimestamp;
168
169    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
170
171    fn render<G: Scope<Timestamp = KafkaTimestamp>>(
172        self,
173        scope: &mut G,
174        config: &RawSourceCreationConfig,
175        resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
176        start_signal: impl std::future::Future<Output = ()> + 'static,
177    ) -> (
178        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
179        Stream<G, Infallible>,
180        Stream<G, HealthStatusMessage>,
181        Option<Stream<G, Probe<KafkaTimestamp>>>,
182        Vec<PressOnDropButton>,
183    ) {
184        let (metadata, probes, metadata_token) =
185            render_metadata_fetcher(scope, self.clone(), config.clone());
186        let (data, progress, health, reader_token) = render_reader(
187            scope,
188            self,
189            config.clone(),
190            resume_uppers,
191            metadata,
192            start_signal,
193        );
194
195        let partition_count = u64::cast_from(config.source_exports.len());
196        let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
197            partition_count,
198            |((output, data), time, diff): &(
199                (usize, Result<SourceMessage, DataflowError>),
200                _,
201                Diff,
202            )| {
203                let output = u64::cast_from(*output);
204                (output, (data.clone(), time.clone(), diff.clone()))
205            },
206        );
207        let mut data_collections = BTreeMap::new();
208        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
209            data_collections.insert(*id, data_stream.as_collection());
210        }
211
212        (
213            data_collections,
214            progress,
215            health,
216            Some(probes),
217            vec![metadata_token, reader_token],
218        )
219    }
220}
221
222/// Render the reader of a Kafka source.
223///
224/// The reader is responsible for polling the Kafka topic partitions for new messages, and
225/// transforming them into a `SourceMessage` collection.
226fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
227    scope: &G,
228    connection: KafkaSourceConnection,
229    config: RawSourceCreationConfig,
230    resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
231    metadata_stream: Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
232    start_signal: impl std::future::Future<Output = ()> + 'static,
233) -> (
234    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
235    Stream<G, Infallible>,
236    Stream<G, HealthStatusMessage>,
237    PressOnDropButton,
238) {
239    let name = format!("KafkaReader({})", config.id);
240    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
241
242    let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
243    let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
244    let (health_output, health_stream) = builder.new_output();
245
246    let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline);
247
248    let mut outputs = vec![];
249
250    // Contains the `SourceStatistics` entries for exports that require a snapshot.
251    let mut all_export_stats = vec![];
252    let mut snapshot_export_stats = vec![];
253    for (idx, (id, export)) in config.source_exports.iter().enumerate() {
254        let SourceExport {
255            details,
256            storage_metadata: _,
257            data_config: _,
258        } = export;
259        let resume_upper = Antichain::from_iter(
260            config
261                .source_resume_uppers
262                .get(id)
263                .expect("all source exports must be present in source resume uppers")
264                .iter()
265                .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
266        );
267
268        let metadata_columns = match details {
269            SourceExportDetails::Kafka(details) => details
270                .metadata_columns
271                .iter()
272                .map(|(_name, kind)| kind.clone())
273                .collect::<Vec<_>>(),
274            _ => panic!("unexpected source export details: {:?}", details),
275        };
276
277        let statistics = config
278            .statistics
279            .get(id)
280            .expect("statistics have been initialized")
281            .clone();
282        // export requires snapshot
283        if resume_upper.as_ref() == &[Partitioned::minimum()] {
284            snapshot_export_stats.push(statistics.clone());
285        }
286        all_export_stats.push(statistics);
287
288        let output = SourceOutputInfo {
289            id: *id,
290            resume_upper,
291            output_index: idx,
292            metadata_columns,
293        };
294        outputs.push(output);
295    }
296
297    let busy_signal = Arc::clone(&config.busy_signal);
298    let button = builder.build(move |caps| {
299        SignaledFuture::new(busy_signal, async move {
300            let [mut data_cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
301
302            let client_id = connection.client_id(
303                config.config.config_set(),
304                &config.config.connection_context,
305                config.id,
306            );
307            let group_id = connection.group_id(&config.config.connection_context, config.id);
308            let KafkaSourceConnection {
309                connection,
310                topic,
311                topic_metadata_refresh_interval,
312                start_offsets,
313                metadata_columns: _,
314                // Exhaustive match protects against forgetting to apply an
315                // option. Ignored fields are justified below.
316                connection_id: _,   // not needed here
317                group_id_prefix: _, // used above via `connection.group_id`
318            } = connection;
319
320            // Start offsets is a map from partition to the next offset to read from.
321            let mut start_offsets: BTreeMap<_, i64> = start_offsets
322                .clone()
323                .into_iter()
324                .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
325                .map(|(k, v)| (k, v))
326                .collect();
327
328            let mut partition_capabilities = BTreeMap::new();
329            let mut max_pid = None;
330            let resume_upper = Antichain::from_iter(
331                outputs
332                    .iter()
333                    .map(|output| output.resume_upper.clone())
334                    .flatten(),
335            );
336
337            for ts in resume_upper.elements() {
338                if let Some(pid) = ts.interval().singleton() {
339                    let pid = pid.unwrap_exact();
340                    max_pid = std::cmp::max(max_pid, Some(*pid));
341                    if responsible_for_pid(&config, *pid) {
342                        let restored_offset = i64::try_from(ts.timestamp().offset)
343                            .expect("restored kafka offsets must fit into i64");
344                        if let Some(start_offset) = start_offsets.get_mut(pid) {
345                            *start_offset = std::cmp::max(restored_offset, *start_offset);
346                        } else {
347                            start_offsets.insert(*pid, restored_offset);
348                        }
349
350                        let part_ts = Partitioned::new_singleton(
351                            RangeBound::exact(*pid),
352                            ts.timestamp().clone(),
353                        );
354                        let part_cap = PartitionCapability {
355                            data: data_cap.delayed(&part_ts),
356                            progress: progress_cap.delayed(&part_ts),
357                        };
358                        partition_capabilities.insert(*pid, part_cap);
359                    }
360                }
361            }
362            let lower = max_pid
363                .map(RangeBound::after)
364                .unwrap_or(RangeBound::NegInfinity);
365            let future_ts =
366                Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
367            data_cap.downgrade(&future_ts);
368            progress_cap.downgrade(&future_ts);
369
370            info!(
371                source_id = config.id.to_string(),
372                worker_id = config.worker_id,
373                num_workers = config.worker_count,
374                "instantiating Kafka source reader at offsets {start_offsets:?}"
375            );
376
377            let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
378            let notificator = Arc::new(Notify::new());
379
380            let consumer: Result<BaseConsumer<_>, _> = connection
381                .create_with_context(
382                    &config.config,
383                    GlueConsumerContext {
384                        notificator: Arc::clone(&notificator),
385                        stats_tx,
386                        inner: MzClientContext::default(),
387                    },
388                    &btreemap! {
389                        // Disable Kafka auto commit. We manually commit offsets
390                        // to Kafka once we have reclocked those offsets, so
391                        // that users can use standard Kafka tools for progress
392                        // tracking.
393                        "enable.auto.commit" => "false".into(),
394                        // Always begin ingest at 0 when restarted, even if Kafka
395                        // contains committed consumer read offsets
396                        "auto.offset.reset" => "earliest".into(),
397                        // Use the user-configured topic metadata refresh
398                        // interval.
399                        "topic.metadata.refresh.interval.ms" =>
400                            topic_metadata_refresh_interval
401                            .as_millis()
402                            .to_string(),
403                        // TODO: document the rationale for this.
404                        "fetch.message.max.bytes" => "134217728".into(),
405                        // Consumer group ID, which may have been overridden by
406                        // the user. librdkafka requires this, and we use offset
407                        // committing to provide a way for users to monitor
408                        // ingest progress, though we do not rely on the
409                        // committed offsets for any functionality.
410                        "group.id" => group_id.clone(),
411                        // Allow Kafka monitoring tools to identify this
412                        // consumer.
413                        "client.id" => client_id.clone(),
414                    },
415                    InTask::Yes,
416                )
417                .await;
418
419            let consumer = match consumer {
420                Ok(consumer) => Arc::new(consumer),
421                Err(e) => {
422                    let update = HealthStatusUpdate::halting(
423                        format!(
424                            "failed creating kafka reader consumer: {}",
425                            e.display_with_causes()
426                        ),
427                        None,
428                    );
429                    health_output.give(
430                        &health_cap,
431                        HealthStatusMessage {
432                            id: None,
433                            namespace: if matches!(e, ContextCreationError::Ssh(_)) {
434                                StatusNamespace::Ssh
435                            } else {
436                                StatusNamespace::Kafka
437                            },
438                            update: update.clone(),
439                        },
440                    );
441                    for (output, update) in outputs.iter().repeat_clone(update) {
442                        health_output.give(
443                            &health_cap,
444                            HealthStatusMessage {
445                                id: Some(output.id),
446                                namespace: if matches!(e, ContextCreationError::Ssh(_)) {
447                                    StatusNamespace::Ssh
448                                } else {
449                                    StatusNamespace::Kafka
450                                },
451                                update,
452                            },
453                        );
454                    }
455                    // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
456                    // Returning would incorrectly present to the remap operator as progress to the
457                    // empty frontier which would be incorrectly recorded to the remap shard.
458                    std::future::pending::<()>().await;
459                    unreachable!("pending future never returns");
460                }
461            };
462
463            // Note that we wait for this AFTER we downgrade to the source `resume_upper`. This
464            // allows downstream operators (namely, the `reclock_operator`) to downgrade to the
465            // `resume_upper`, which is necessary for this basic form of backpressure to work.
466            start_signal.await;
467            info!(
468                source_id = config.id.to_string(),
469                worker_id = config.worker_id,
470                num_workers = config.worker_count,
471                "kafka worker noticed rehydration is finished, starting partition queues..."
472            );
473
474            let partition_ids = start_offsets.keys().copied().collect();
475            let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
476
477            let mut reader = KafkaSourceReader {
478                topic_name: topic.clone(),
479                source_name: config.name.clone(),
480                id: config.id,
481                partition_consumers: Vec::new(),
482                consumer: Arc::clone(&consumer),
483                worker_id: config.worker_id,
484                worker_count: config.worker_count,
485                last_offsets: outputs
486                    .iter()
487                    .map(|output| (output.output_index, BTreeMap::new()))
488                    .collect(),
489                start_offsets,
490                stats_rx,
491                partition_metrics: config.metrics.get_kafka_source_metrics(
492                    partition_ids,
493                    topic.clone(),
494                    config.id,
495                ),
496                partition_capabilities,
497            };
498
499            let offset_committer = KafkaResumeUpperProcessor {
500                config: config.clone(),
501                topic_name: topic.clone(),
502                consumer,
503                statistics: all_export_stats.clone(),
504            };
505
506            // Seed the progress metrics with `0` if we are snapshotting.
507            if !snapshot_export_stats.is_empty() {
508                if let Err(e) = offset_committer
509                    .process_frontier(resume_upper.clone())
510                    .await
511                {
512                    offset_commit_metrics.offset_commit_failures.inc();
513                    tracing::warn!(
514                        %e,
515                        "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
516                        worker_id = config.worker_id,
517                        source_id = config.id,
518                        upper = resume_upper.pretty()
519                    );
520                }
521                // Reset snapshot statistics for any exports that are not involved
522                // in this round of snapshotting. Those that are snapshotting this round will
523                // see updates as the snapshot commences.
524                for statistics in config.statistics.values() {
525                    statistics.set_snapshot_records_known(0);
526                    statistics.set_snapshot_records_staged(0);
527                }
528            }
529
530            let resume_uppers_process_loop = async move {
531                tokio::pin!(resume_uppers);
532                while let Some(frontier) = resume_uppers.next().await {
533                    if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
534                        offset_commit_metrics.offset_commit_failures.inc();
535                        tracing::warn!(
536                            %e,
537                            "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
538                            worker_id = config.worker_id,
539                            source_id = config.id,
540                            upper = frontier.pretty()
541                        );
542                    }
543                }
544                // During dataflow shutdown this loop can end due to the general chaos caused by
545                // dropping tokens as a means to shutdown. This call ensures this future never ends
546                // and we instead rely on this operator being dropped altogether when *its* token
547                // is dropped.
548                std::future::pending::<()>().await;
549            };
550            tokio::pin!(resume_uppers_process_loop);
551
552            let mut metadata_update: Option<MetadataUpdate> = None;
553            let mut snapshot_total = None;
554
555            let max_wait_time =
556                mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
557            loop {
558                // Wait for data or metadata events while also making progress with offset
559                // committing.
560                tokio::select! {
561                    // TODO(petrosagg): remove the timeout and rely purely on librdkafka waking us
562                    // up
563                    _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
564
565                    _ = metadata_input.ready() => {
566                        // Collect all pending updates, then only keep the most recent one.
567                        let mut updates = Vec::new();
568                        while let Some(event) = metadata_input.next_sync() {
569                            if let Event::Data(_, mut data) = event {
570                                updates.append(&mut data);
571                            }
572                        }
573                        metadata_update = updates
574                            .into_iter()
575                            .max_by_key(|(ts, _)| *ts)
576                            .map(|(_, update)| update);
577                    }
578
579                    // This future is not cancel safe but we are only passing a reference to it in
580                    // the select! loop so the future stays on the stack and never gets cancelled
581                    // until the end of the function.
582                    _ = resume_uppers_process_loop.as_mut() => {},
583                }
584
585                match metadata_update.take() {
586                    Some(MetadataUpdate::Partitions(partitions)) => {
587                        let max_pid = partitions.keys().last().cloned();
588                        let lower = max_pid
589                            .map(RangeBound::after)
590                            .unwrap_or(RangeBound::NegInfinity);
591                        let future_ts = Partitioned::new_range(
592                            lower,
593                            RangeBound::PosInfinity,
594                            MzOffset::from(0),
595                        );
596
597                        let mut offset_known = 0;
598                        for (&pid, &high_watermark) in &partitions {
599                            if responsible_for_pid(&config, pid) {
600                                offset_known += high_watermark;
601                                reader.ensure_partition(pid);
602                                if let Entry::Vacant(entry) =
603                                    reader.partition_capabilities.entry(pid)
604                                {
605                                    let start_offset = match reader.start_offsets.get(&pid) {
606                                        Some(&offset) => offset.try_into().unwrap(),
607                                        None => 0u64,
608                                    };
609                                    let part_since_ts = Partitioned::new_singleton(
610                                        RangeBound::exact(pid),
611                                        MzOffset::from(start_offset),
612                                    );
613                                    let part_upper_ts = Partitioned::new_singleton(
614                                        RangeBound::exact(pid),
615                                        MzOffset::from(high_watermark),
616                                    );
617
618                                    // This is the moment at which we have discovered a new partition
619                                    // and we need to make sure we produce its initial snapshot at a,
620                                    // single timestamp so that the source transitions from no data
621                                    // from this partition to all the data of this partition. We do
622                                    // this by initializing the data capability to the starting offset
623                                    // and, importantly, the progress capability directly to the high
624                                    // watermark. This jump of the progress capability ensures that
625                                    // everything until the high watermark will be reclocked to a
626                                    // single point.
627                                    entry.insert(PartitionCapability {
628                                        data: data_cap.delayed(&part_since_ts),
629                                        progress: progress_cap.delayed(&part_upper_ts),
630                                    });
631                                }
632                            }
633                        }
634
635                        // If we are snapshotting, record our first set of partitions as the snapshot
636                        // size.
637                        if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
638                            // Note that we want to represent the _number of offsets_, which
639                            // means the watermark's frontier semantics is correct, without
640                            // subtracting (Kafka offsets start at 0).
641                            snapshot_total = Some(offset_known);
642                        }
643
644                        // Clear all the health namespaces we know about.
645                        // Note that many kafka sources's don't have an ssh tunnel, but the
646                        // `health_operator` handles this fine.
647                        for output in &outputs {
648                            for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
649                                health_output.give(
650                                    &health_cap,
651                                    HealthStatusMessage {
652                                        id: Some(output.id),
653                                        namespace,
654                                        update: HealthStatusUpdate::running(),
655                                    },
656                                );
657                            }
658                        }
659                        for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
660                            health_output.give(
661                                &health_cap,
662                                HealthStatusMessage {
663                                    id: None,
664                                    namespace,
665                                    update: HealthStatusUpdate::running(),
666                                },
667                            );
668                        }
669
670                        for export_stat in all_export_stats.iter() {
671                            export_stat.set_offset_known(offset_known);
672                        }
673
674                        data_cap.downgrade(&future_ts);
675                        progress_cap.downgrade(&future_ts);
676                    }
677                    Some(MetadataUpdate::TransientError(status)) => {
678                        if let Some(update) = status.kafka {
679                            health_output.give(
680                                &health_cap,
681                                HealthStatusMessage {
682                                    id: None,
683                                    namespace: StatusNamespace::Kafka,
684                                    update: update.clone(),
685                                },
686                            );
687                            for (output, update) in outputs.iter().repeat_clone(update) {
688                                health_output.give(
689                                    &health_cap,
690                                    HealthStatusMessage {
691                                        id: Some(output.id),
692                                        namespace: StatusNamespace::Kafka,
693                                        update,
694                                    },
695                                );
696                            }
697                        }
698                        if let Some(update) = status.ssh {
699                            health_output.give(
700                                &health_cap,
701                                HealthStatusMessage {
702                                    id: None,
703                                    namespace: StatusNamespace::Ssh,
704                                    update: update.clone(),
705                                },
706                            );
707                            for (output, update) in outputs.iter().repeat_clone(update) {
708                                health_output.give(
709                                    &health_cap,
710                                    HealthStatusMessage {
711                                        id: Some(output.id),
712                                        namespace: StatusNamespace::Ssh,
713                                        update,
714                                    },
715                                );
716                            }
717                        }
718                    }
719                    Some(MetadataUpdate::DefiniteError(error)) => {
720                        health_output.give(
721                            &health_cap,
722                            HealthStatusMessage {
723                                id: None,
724                                namespace: StatusNamespace::Kafka,
725                                update: HealthStatusUpdate::stalled(
726                                    error.to_string(),
727                                    None,
728                                ),
729                            },
730                        );
731                        let error = Err(error.into());
732                        let time = data_cap.time().clone();
733                        for (output, error) in
734                            outputs.iter().map(|o| o.output_index).repeat_clone(error)
735                        {
736                            data_output
737                                .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
738                                .await;
739                        }
740
741                        return;
742                    }
743                    None => {}
744                }
745
746                // Poll the consumer once. We split the consumer's partitions out into separate
747                // queues and poll those individually, but it's still necessary to drive logic that
748                // consumes from rdkafka's internal event queue, such as statistics callbacks.
749                //
750                // Additionally, assigning topics and splitting them off into separate queues is
751                // not atomic, so we expect to see at least some messages to show up when polling
752                // the consumer directly.
753                while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
754                    match result {
755                        Err(e) => {
756                            let error = format!(
757                                "kafka error when polling consumer for source: {} topic: {} : {}",
758                                reader.source_name, reader.topic_name, e
759                            );
760                            let status = HealthStatusUpdate::stalled(error, None);
761                            health_output.give(
762                                &health_cap,
763                                HealthStatusMessage {
764                                    id: None,
765                                    namespace: StatusNamespace::Kafka,
766                                    update: status.clone(),
767                                },
768                            );
769                            for (output, status) in outputs.iter().repeat_clone(status) {
770                                health_output.give(
771                                    &health_cap,
772                                    HealthStatusMessage {
773                                        id: Some(output.id),
774                                        namespace: StatusNamespace::Kafka,
775                                        update: status,
776                                    },
777                                );
778                            }
779                        }
780                        Ok(message) => {
781                            let output_messages = outputs
782                                .iter()
783                                .map(|output| {
784                                    let (message, ts) = construct_source_message(
785                                        &message,
786                                        &output.metadata_columns,
787                                    );
788                                    (output.output_index, message, ts)
789                                })
790                                // This vec allocation is required to allow obtaining a `&mut`
791                                // on `reader` for the `reader.handle_message` call in the
792                                // loop below since  `message` is borrowed from `reader`.
793                                .collect::<Vec<_>>();
794                            for (output_index, message, ts) in output_messages {
795                                if let Some((msg, time, diff)) =
796                                    reader.handle_message(message, ts, &output_index)
797                                {
798                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
799                                    let part_cap = &reader.partition_capabilities[pid].data;
800                                    let msg = msg.map_err(|e| {
801                                        DataflowError::SourceError(Box::new(SourceError {
802                                            error: SourceErrorDetails::Other(e.to_string().into()),
803                                        }))
804                                    });
805                                    data_output
806                                        .give_fueled(part_cap, ((output_index, msg), time, diff))
807                                        .await;
808                                }
809                            }
810                        }
811                    }
812                }
813
814                reader.update_stats();
815
816                // Take the consumers temporarily to get around borrow checker errors
817                let mut consumers = std::mem::take(&mut reader.partition_consumers);
818                for consumer in consumers.iter_mut() {
819                    let pid = consumer.pid();
820                    // We want to make sure the rest of the actions in the outer loops get
821                    // a chance to run. If rdkafka keeps pumping data at us we might find
822                    // ourselves in a situation where we keep dumping data into the
823                    // dataflow without signaling progress. For this reason we consume at most
824                    // 10k messages from each partition and go around the loop.
825                    let mut partition_exhausted = false;
826                    for _ in 0..10_000 {
827                        let Some(message) = consumer.get_next_message().transpose() else {
828                            partition_exhausted = true;
829                            break;
830                        };
831
832                        for output in outputs.iter() {
833                            let message = match &message {
834                                Ok((msg, pid)) => {
835                                    let (msg, ts) =
836                                        construct_source_message(msg, &output.metadata_columns);
837                                    assert_eq!(*pid, ts.0);
838                                    Ok(reader.handle_message(msg, ts, &output.output_index))
839                                }
840                                Err(err) => Err(err),
841                            };
842                            match message {
843                                Ok(Some((msg, time, diff))) => {
844                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
845                                    let part_cap = &reader.partition_capabilities[pid].data;
846                                    let msg = msg.map_err(|e| {
847                                        DataflowError::SourceError(Box::new(SourceError {
848                                            error: SourceErrorDetails::Other(e.to_string().into()),
849                                        }))
850                                    });
851                                    data_output
852                                        .give_fueled(
853                                            part_cap,
854                                            ((output.output_index, msg), time, diff),
855                                        )
856                                        .await;
857                                }
858                                // The message was from an offset we've already seen.
859                                Ok(None) => continue,
860                                Err(err) => {
861                                    let last_offset = reader
862                                        .last_offsets
863                                        .get(&output.output_index)
864                                        .expect("output known to be installed")
865                                        .get(&pid)
866                                        .expect("partition known to be installed");
867
868                                    let status = HealthStatusUpdate::stalled(
869                                        format!(
870                                            "error consuming from source: {} topic: {topic}:\
871                                             partition: {pid} last processed offset:\
872                                             {last_offset} : {err}",
873                                            config.name
874                                        ),
875                                        None,
876                                    );
877                                    health_output.give(
878                                        &health_cap,
879                                        HealthStatusMessage {
880                                            id: None,
881                                            namespace: StatusNamespace::Kafka,
882                                            update: status.clone(),
883                                        },
884                                    );
885                                    health_output.give(
886                                        &health_cap,
887                                        HealthStatusMessage {
888                                            id: Some(output.id),
889                                            namespace: StatusNamespace::Kafka,
890                                            update: status,
891                                        },
892                                    );
893                                }
894                            }
895                        }
896                    }
897                    if !partition_exhausted {
898                        notificator.notify_one();
899                    }
900                }
901                // We can now put them back
902                assert!(reader.partition_consumers.is_empty());
903                reader.partition_consumers = consumers;
904
905                let positions = reader.consumer.position().unwrap();
906                let topic_positions = positions.elements_for_topic(&reader.topic_name);
907                let mut snapshot_staged = 0;
908
909                for position in topic_positions {
910                    // The offset begins in the `Offset::Invalid` state in which case we simply
911                    // skip this partition.
912                    if let Offset::Offset(offset) = position.offset() {
913                        let pid = position.partition();
914                        let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
915                        let upper =
916                            Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
917
918                        let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
919                        match part_cap.data.try_downgrade(&upper) {
920                            Ok(()) => {
921                                if !snapshot_export_stats.is_empty() {
922                                    // The `.position()` of the consumer represents what offset we have
923                                    // read up to.
924                                    snapshot_staged += offset.try_into().unwrap_or(0u64);
925                                    // This will always be `Some` at this point.
926                                    if let Some(snapshot_total) = snapshot_total {
927                                        // We will eventually read past the snapshot total, so we need
928                                        // to bound it here.
929                                        snapshot_staged =
930                                            std::cmp::min(snapshot_staged, snapshot_total);
931                                    }
932                                }
933                            }
934                            Err(_) => {
935                                // If we can't downgrade, it means we have already seen this offset.
936                                // This is expected and we can safely ignore it.
937                                info!(
938                                    source_id = config.id.to_string(),
939                                    worker_id = config.worker_id,
940                                    num_workers = config.worker_count,
941                                    "kafka source frontier downgrade skipped due to already \
942                                     seen offset: {:?}",
943                                    upper
944                                );
945                            }
946                        };
947
948                        // We use try_downgrade here because during the initial snapshot phase the
949                        // data capability is not beyond the progress capability and therefore a
950                        // normal downgrade would panic. Once it catches up though the data
951                        // capbility is what's pushing the progress capability forward.
952                        let _ = part_cap.progress.try_downgrade(&upper);
953                    }
954                }
955
956                if let (Some(snapshot_total), true) =
957                    (snapshot_total, !snapshot_export_stats.is_empty())
958                {
959                    for export_stat in snapshot_export_stats.iter() {
960                        export_stat.set_snapshot_records_known(snapshot_total);
961                        export_stat.set_snapshot_records_staged(snapshot_staged);
962                    }
963                    if snapshot_total == snapshot_staged {
964                        snapshot_export_stats.clear();
965                    }
966                }
967            }
968        })
969    });
970
971    (
972        stream.as_collection(),
973        progress_stream,
974        health_stream,
975        button.press_on_drop(),
976    )
977}
978
979impl KafkaResumeUpperProcessor {
980    async fn process_frontier(
981        &self,
982        frontier: Antichain<KafkaTimestamp>,
983    ) -> Result<(), anyhow::Error> {
984        use rdkafka::consumer::CommitMode;
985
986        // Generate a list of partitions that this worker is responsible for
987        let mut offsets = vec![];
988        let mut offset_committed = 0;
989        for ts in frontier.iter() {
990            if let Some(pid) = ts.interval().singleton() {
991                let pid = pid.unwrap_exact();
992                if responsible_for_pid(&self.config, *pid) {
993                    offsets.push((pid.clone(), *ts.timestamp()));
994
995                    // Note that we do not subtract 1 from the frontier. Imagine
996                    // that frontier is 2 for this pid. That means we have
997                    // full processed offset 0 and offset 1, which means we have
998                    // processed _2_ offsets.
999                    offset_committed += ts.timestamp().offset;
1000                }
1001            }
1002        }
1003
1004        for export_stat in self.statistics.iter() {
1005            export_stat.set_offset_committed(offset_committed);
1006        }
1007
1008        if !offsets.is_empty() {
1009            let mut tpl = TopicPartitionList::new();
1010            for (pid, offset) in offsets {
1011                let offset_to_commit =
1012                    Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
1013                tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
1014                    .expect("offset known to be valid");
1015            }
1016            let consumer = Arc::clone(&self.consumer);
1017            mz_ore::task::spawn_blocking(
1018                || format!("source({}) kafka offset commit", self.config.id),
1019                move || consumer.commit(&tpl, CommitMode::Sync),
1020            )
1021            .await??;
1022        }
1023        Ok(())
1024    }
1025}
1026
1027impl KafkaSourceReader {
1028    /// Ensures that a partition queue for `pid` exists.
1029    fn ensure_partition(&mut self, pid: PartitionId) {
1030        if self.last_offsets.is_empty() {
1031            tracing::info!(
1032                source_id = %self.id,
1033                worker_id = %self.worker_id,
1034                "kafka source does not have any outputs, not creating partition queue");
1035
1036            return;
1037        }
1038        for last_offsets in self.last_offsets.values() {
1039            // early exit if we've already inserted this partition
1040            if last_offsets.contains_key(&pid) {
1041                return;
1042            }
1043        }
1044
1045        let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1046        self.create_partition_queue(pid, Offset::Offset(start_offset));
1047
1048        for last_offsets in self.last_offsets.values_mut() {
1049            let prev = last_offsets.insert(pid, start_offset - 1);
1050            assert_none!(prev);
1051        }
1052    }
1053
1054    /// Creates a new partition queue for `partition_id`.
1055    fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1056        info!(
1057            source_id = self.id.to_string(),
1058            worker_id = self.worker_id,
1059            num_workers = self.worker_count,
1060            "activating Kafka queue for topic {}, partition {}",
1061            self.topic_name,
1062            partition_id,
1063        );
1064
1065        // Collect old partition assignments
1066        let tpl = self.consumer.assignment().unwrap();
1067        // Create list from assignments
1068        let mut partition_list = TopicPartitionList::new();
1069        for partition in tpl.elements_for_topic(&self.topic_name) {
1070            partition_list
1071                .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1072                .expect("offset known to be valid");
1073        }
1074        // Add new partition
1075        partition_list
1076            .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1077            .expect("offset known to be valid");
1078        self.consumer
1079            .assign(&partition_list)
1080            .expect("assignment known to be valid");
1081
1082        // Since librdkafka v1.6.0, we need to recreate all partition queues
1083        // after every call to `self.consumer.assign`.
1084        let context = Arc::clone(self.consumer.context());
1085        for pc in &mut self.partition_consumers {
1086            pc.partition_queue = self
1087                .consumer
1088                .split_partition_queue(&self.topic_name, pc.pid)
1089                .expect("partition known to be valid");
1090            pc.partition_queue.set_nonempty_callback({
1091                let context = Arc::clone(&context);
1092                move || context.inner().activate()
1093            });
1094        }
1095
1096        let mut partition_queue = self
1097            .consumer
1098            .split_partition_queue(&self.topic_name, partition_id)
1099            .expect("partition known to be valid");
1100        partition_queue.set_nonempty_callback(move || context.inner().activate());
1101        self.partition_consumers
1102            .push(PartitionConsumer::new(partition_id, partition_queue));
1103        assert_eq!(
1104            self.consumer
1105                .assignment()
1106                .unwrap()
1107                .elements_for_topic(&self.topic_name)
1108                .len(),
1109            self.partition_consumers.len()
1110        );
1111    }
1112
1113    /// Read any statistics JSON blobs generated via the rdkafka statistics callback.
1114    fn update_stats(&mut self) {
1115        while let Ok(stats) = self.stats_rx.try_recv() {
1116            match serde_json::from_str::<Statistics>(&stats.to_string()) {
1117                Ok(statistics) => {
1118                    let topic = statistics.topics.get(&self.topic_name);
1119                    match topic {
1120                        Some(topic) => {
1121                            for (id, partition) in &topic.partitions {
1122                                self.partition_metrics
1123                                    .set_offset_max(*id, partition.hi_offset);
1124                            }
1125                        }
1126                        None => error!("No stats found for topic: {}", &self.topic_name),
1127                    }
1128                }
1129                Err(e) => {
1130                    error!("failed decoding librdkafka statistics JSON: {}", e);
1131                }
1132            }
1133        }
1134    }
1135
1136    /// Checks if the given message is viable for emission. This checks if the message offset is
1137    /// past the expected offset and returns None if it is not.
1138    fn handle_message(
1139        &mut self,
1140        message: Result<SourceMessage, KafkaHeaderParseError>,
1141        (partition, offset): (PartitionId, MzOffset),
1142        output_index: &usize,
1143    ) -> Option<(
1144        Result<SourceMessage, KafkaHeaderParseError>,
1145        KafkaTimestamp,
1146        Diff,
1147    )> {
1148        // Offsets are guaranteed to be 1) monotonically increasing *unless* there is
1149        // a network issue or a new partition added, at which point the consumer may
1150        // start processing the topic from the beginning, or we may see duplicate offsets
1151        // At all times, the guarantee : if we see offset x, we have seen all offsets [0,x-1]
1152        // that we are ever going to see holds.
1153        // Offsets are guaranteed to be contiguous when compaction is disabled. If compaction
1154        // is enabled, there may be gaps in the sequence.
1155        // If we see an "old" offset, we skip that message.
1156
1157        // Given the explicit consumer to partition assignment, we should never receive a message
1158        // for a partition for which we have no metadata
1159        assert!(
1160            self.last_offsets
1161                .get(output_index)
1162                .unwrap()
1163                .contains_key(&partition)
1164        );
1165
1166        let last_offset_ref = self
1167            .last_offsets
1168            .get_mut(output_index)
1169            .expect("output known to be installed")
1170            .get_mut(&partition)
1171            .expect("partition known to be installed");
1172
1173        let last_offset = *last_offset_ref;
1174        let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1175        if offset_as_i64 <= last_offset {
1176            info!(
1177                source_id = self.id.to_string(),
1178                worker_id = self.worker_id,
1179                num_workers = self.worker_count,
1180                "kafka message before expected offset: \
1181                 source {} (reading topic {}, partition {}, output {}) \
1182                 received offset {} expected offset {:?}",
1183                self.source_name,
1184                self.topic_name,
1185                partition,
1186                output_index,
1187                offset.offset,
1188                last_offset + 1,
1189            );
1190            // We explicitly should not consume the message as we have already processed it.
1191            None
1192        } else {
1193            *last_offset_ref = offset_as_i64;
1194
1195            let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1196            Some((message, ts, Diff::ONE))
1197        }
1198    }
1199}
1200
1201fn construct_source_message(
1202    msg: &BorrowedMessage<'_>,
1203    metadata_columns: &[KafkaMetadataKind],
1204) -> (
1205    Result<SourceMessage, KafkaHeaderParseError>,
1206    (PartitionId, MzOffset),
1207) {
1208    let pid = msg.partition();
1209    let Ok(offset) = u64::try_from(msg.offset()) else {
1210        panic!(
1211            "got negative offset ({}) from otherwise non-error'd kafka message",
1212            msg.offset()
1213        );
1214    };
1215
1216    let mut metadata = Row::default();
1217    let mut packer = metadata.packer();
1218    for kind in metadata_columns {
1219        match kind {
1220            KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1221            KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1222            KafkaMetadataKind::Timestamp => {
1223                let ts = msg
1224                    .timestamp()
1225                    .to_millis()
1226                    .expect("kafka sources always have upstream_time");
1227
1228                let d: Datum = DateTime::from_timestamp_millis(ts)
1229                    .and_then(|dt| {
1230                        let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1231                            dt.naive_utc().try_into().ok();
1232                        ct
1233                    })
1234                    .into();
1235                packer.push(d)
1236            }
1237            KafkaMetadataKind::Header { key, use_bytes } => {
1238                match msg.headers() {
1239                    Some(headers) => {
1240                        let d = headers
1241                            .iter()
1242                            .filter(|header| header.key == key)
1243                            .last()
1244                            .map(|header| match header.value {
1245                                Some(v) => {
1246                                    if *use_bytes {
1247                                        Ok(Datum::Bytes(v))
1248                                    } else {
1249                                        match str::from_utf8(v) {
1250                                            Ok(str) => Ok(Datum::String(str)),
1251                                            Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1252                                                key: key.clone(),
1253                                                raw: v.to_vec(),
1254                                            }),
1255                                        }
1256                                    }
1257                                }
1258                                None => Ok(Datum::Null),
1259                            })
1260                            .unwrap_or_else(|| {
1261                                Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1262                            });
1263                        match d {
1264                            Ok(d) => packer.push(d),
1265                            //abort with a definite error when the header is not found or cannot be parsed correctly
1266                            Err(err) => return (Err(err), (pid, offset.into())),
1267                        }
1268                    }
1269                    None => packer.push(Datum::Null),
1270                }
1271            }
1272            KafkaMetadataKind::Headers => {
1273                packer.push_list_with(|r| {
1274                    if let Some(headers) = msg.headers() {
1275                        for header in headers.iter() {
1276                            match header.value {
1277                                Some(v) => r.push_list_with(|record_row| {
1278                                    record_row.push(Datum::String(header.key));
1279                                    record_row.push(Datum::Bytes(v));
1280                                }),
1281                                None => r.push_list_with(|record_row| {
1282                                    record_row.push(Datum::String(header.key));
1283                                    record_row.push(Datum::Null);
1284                                }),
1285                            }
1286                        }
1287                    }
1288                });
1289            }
1290        }
1291    }
1292
1293    let key = match msg.key() {
1294        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1295        None => Row::pack([Datum::Null]),
1296    };
1297    let value = match msg.payload() {
1298        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1299        None => Row::pack([Datum::Null]),
1300    };
1301    (
1302        Ok(SourceMessage {
1303            key,
1304            value,
1305            metadata,
1306        }),
1307        (pid, offset.into()),
1308    )
1309}
1310
1311/// Wrapper around a partition containing the underlying consumer
1312struct PartitionConsumer {
1313    /// the partition id with which this consumer is associated
1314    pid: PartitionId,
1315    /// The underlying Kafka partition queue
1316    partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1317}
1318
1319impl PartitionConsumer {
1320    /// Creates a new partition consumer from underlying Kafka consumer
1321    fn new(
1322        pid: PartitionId,
1323        partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1324    ) -> Self {
1325        PartitionConsumer {
1326            pid,
1327            partition_queue,
1328        }
1329    }
1330
1331    /// Returns the next message to process for this partition (if any).
1332    ///
1333    /// The outer `Result` represents irrecoverable failures, the inner one can and will
1334    /// be transformed into empty values.
1335    ///
1336    /// The inner `Option` represents if there is a message to process.
1337    fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1338        match self.partition_queue.poll(Duration::from_millis(0)) {
1339            Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1340            Some(Err(err)) => Err(err),
1341            _ => Ok(None),
1342        }
1343    }
1344
1345    /// Return the partition id for this PartitionConsumer
1346    fn pid(&self) -> PartitionId {
1347        self.pid
1348    }
1349}
1350
1351/// An implementation of [`ConsumerContext`] that forwards statistics to the
1352/// worker
1353struct GlueConsumerContext {
1354    notificator: Arc<Notify>,
1355    stats_tx: crossbeam_channel::Sender<Jsonb>,
1356    inner: MzClientContext,
1357}
1358
1359impl ClientContext for GlueConsumerContext {
1360    fn stats_raw(&self, statistics: &[u8]) {
1361        match Jsonb::from_slice(statistics) {
1362            Ok(statistics) => {
1363                self.stats_tx
1364                    .send(statistics)
1365                    .expect("timely operator hung up while Kafka source active");
1366                self.activate();
1367            }
1368            Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1369        };
1370    }
1371
1372    // The shape of the rdkafka *Context traits require us to forward to the `MzClientContext`
1373    // implementation.
1374    fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1375        self.inner.log(level, fac, log_message)
1376    }
1377    fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1378        self.inner.error(error, reason)
1379    }
1380}
1381
1382impl GlueConsumerContext {
1383    fn activate(&self) {
1384        self.notificator.notify_one();
1385    }
1386}
1387
1388impl ConsumerContext for GlueConsumerContext {}
1389
1390#[cfg(test)]
1391mod tests {
1392    use std::sync::Arc;
1393    use std::time::Duration;
1394
1395    use mz_kafka_util::client::create_new_client_config_simple;
1396    use rdkafka::consumer::{BaseConsumer, Consumer};
1397    use rdkafka::{Message, Offset, TopicPartitionList};
1398    use uuid::Uuid;
1399
1400    // Splitting off a partition queue with an `Offset` that is not `Offset::Beginning` seems to
1401    // lead to a race condition where sometimes we receive messages from polling the main consumer
1402    // instead of on the partition queue. This can be surfaced by running the test in a loop (in
1403    // the dataflow directory) using:
1404    //
1405    // cargo stress --lib --release source::kafka::tests::reproduce_kafka_queue_issue
1406    //
1407    // cargo-stress can be installed via `cargo install cargo-stress`
1408    //
1409    // You need to set up a topic "queue-test" with 1000 "hello" messages in it. Obviously, running
1410    // this test requires a running Kafka instance at localhost:9092.
1411    #[mz_ore::test]
1412    #[ignore]
1413    fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1414        let topic_name = "queue-test";
1415        let pid = 0;
1416
1417        let mut kafka_config = create_new_client_config_simple();
1418        kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1419        kafka_config.set("enable.auto.commit", "false");
1420        kafka_config.set("group.id", Uuid::new_v4().to_string());
1421        kafka_config.set("fetch.message.max.bytes", "100");
1422        let consumer: BaseConsumer<_> = kafka_config.create()?;
1423
1424        let consumer = Arc::new(consumer);
1425
1426        let mut partition_list = TopicPartitionList::new();
1427        // Using Offset:Beginning here will work fine, only Offset:Offset(0) leads to the race
1428        // condition.
1429        partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1430
1431        consumer.assign(&partition_list)?;
1432
1433        let partition_queue = consumer
1434            .split_partition_queue(topic_name, pid)
1435            .expect("missing partition queue");
1436
1437        let expected_messages = 1_000;
1438
1439        let mut common_queue_count = 0;
1440        let mut partition_queue_count = 0;
1441
1442        loop {
1443            if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1444                match msg {
1445                    Ok(msg) => {
1446                        let _payload =
1447                            std::str::from_utf8(msg.payload().expect("missing payload"))?;
1448                        if partition_queue_count > 0 {
1449                            anyhow::bail!(
1450                                "Got message from common queue after we internally switched to partition queue."
1451                            );
1452                        }
1453
1454                        common_queue_count += 1;
1455                    }
1456                    Err(err) => anyhow::bail!("{}", err),
1457                }
1458            }
1459
1460            match partition_queue.poll(Duration::from_millis(0)) {
1461                Some(Ok(msg)) => {
1462                    let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1463                    partition_queue_count += 1;
1464                }
1465                Some(Err(err)) => anyhow::bail!("{}", err),
1466                _ => (),
1467            }
1468
1469            if (common_queue_count + partition_queue_count) == expected_messages {
1470                break;
1471            }
1472        }
1473
1474        assert!(
1475            common_queue_count == 0,
1476            "Got {} out of {} messages from common queue. Partition queue: {}",
1477            common_queue_count,
1478            expected_messages,
1479            partition_queue_count
1480        );
1481
1482        Ok(())
1483    }
1484}
1485
1486/// Fetches the list of partitions and their corresponding high watermark.
1487fn fetch_partition_info<C: ConsumerContext>(
1488    consumer: &BaseConsumer<C>,
1489    topic: &str,
1490    fetch_timeout: Duration,
1491) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1492    let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1493
1494    let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1495    for pid in pids {
1496        offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1497    }
1498
1499    let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1500
1501    let mut result = BTreeMap::new();
1502    for entry in offset_responses.elements() {
1503        let offset = match entry.offset() {
1504            Offset::Offset(offset) => offset,
1505            offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1506        };
1507
1508        let pid = entry.partition();
1509        let watermark = offset.try_into().expect("invalid negative offset");
1510        result.insert(pid, watermark);
1511    }
1512
1513    Ok(result)
1514}
1515
1516/// An update produced by the metadata fetcher.
1517#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1518enum MetadataUpdate {
1519    /// The current IDs and high watermarks of all topic partitions.
1520    Partitions(BTreeMap<PartitionId, HighWatermark>),
1521    /// A transient error.
1522    ///
1523    /// Transient errors stall the source until their cause has been resolved.
1524    TransientError(HealthStatus),
1525    /// A definite error.
1526    ///
1527    /// Definite errors cannot be recovered from. They poison the source until the end of time.
1528    DefiniteError(SourceError),
1529}
1530
1531impl MetadataUpdate {
1532    /// Return the upstream frontier resulting from the metadata update, if any.
1533    fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1534        match self {
1535            Self::Partitions(partitions) => {
1536                let max_pid = partitions.keys().last().copied();
1537                let lower = max_pid
1538                    .map(RangeBound::after)
1539                    .unwrap_or(RangeBound::NegInfinity);
1540                let future_ts =
1541                    Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1542
1543                let mut frontier = Antichain::from_elem(future_ts);
1544                for (pid, high_watermark) in partitions {
1545                    frontier.insert(Partitioned::new_singleton(
1546                        RangeBound::exact(*pid),
1547                        MzOffset::from(*high_watermark),
1548                    ));
1549                }
1550
1551                Some(frontier)
1552            }
1553            Self::DefiniteError(_) => Some(Antichain::new()),
1554            Self::TransientError(_) => None,
1555        }
1556    }
1557}
1558
1559#[derive(Debug, thiserror::Error)]
1560pub enum KafkaHeaderParseError {
1561    #[error("A header with key '{key}' was not found in the message headers")]
1562    KeyNotFound { key: String },
1563    #[error(
1564        "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1565    )]
1566    Utf8Error { key: String, raw: Vec<u8> },
1567}
1568
1569/// Render the metadata fetcher of a Kafka source.
1570///
1571/// The metadata fetcher is a single-worker operator that is responsible for periodically fetching
1572/// the Kafka topic metadata (partition IDs and high watermarks) and making it available as a
1573/// Timely stream.
1574fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
1575    scope: &G,
1576    connection: KafkaSourceConnection,
1577    config: RawSourceCreationConfig,
1578) -> (
1579    Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
1580    Stream<G, Probe<KafkaTimestamp>>,
1581    PressOnDropButton,
1582) {
1583    let active_worker_id = usize::cast_from(config.id.hashed());
1584    let is_active_worker = active_worker_id % scope.peers() == scope.index();
1585
1586    let resume_upper = Antichain::from_iter(
1587        config
1588            .source_resume_uppers
1589            .values()
1590            .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1591            .flatten(),
1592    );
1593
1594    let name = format!("KafkaMetadataFetcher({})", config.id);
1595    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1596
1597    let (metadata_output, metadata_stream) = builder.new_output();
1598    let (probe_output, probe_stream) = builder.new_output();
1599
1600    let button = builder.build(move |caps| async move {
1601        if !is_active_worker {
1602            return;
1603        }
1604
1605        let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1606
1607        let client_id = connection.client_id(
1608            config.config.config_set(),
1609            &config.config.connection_context,
1610            config.id,
1611        );
1612        let KafkaSourceConnection {
1613            connection,
1614            topic,
1615            topic_metadata_refresh_interval,
1616            ..
1617        } = connection;
1618
1619        let consumer: Result<BaseConsumer<_>, _> = connection
1620            .create_with_context(
1621                &config.config,
1622                MzClientContext::default(),
1623                &btreemap! {
1624                    // Use the user-configured topic metadata refresh
1625                    // interval.
1626                    "topic.metadata.refresh.interval.ms" =>
1627                        topic_metadata_refresh_interval
1628                        .as_millis()
1629                        .to_string(),
1630                    // Allow Kafka monitoring tools to identify this
1631                    // consumer.
1632                    "client.id" => format!("{client_id}-metadata"),
1633                },
1634                InTask::Yes,
1635            )
1636            .await;
1637
1638        let consumer = match consumer {
1639            Ok(consumer) => consumer,
1640            Err(e) => {
1641                let msg = format!(
1642                    "failed creating kafka metadata consumer: {}",
1643                    e.display_with_causes()
1644                );
1645                let status_update = HealthStatusUpdate::halting(msg, None);
1646                let status = match e {
1647                    ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1648                    _ => HealthStatus::kafka(status_update),
1649                };
1650                let error = MetadataUpdate::TransientError(status);
1651                let timestamp = (config.now_fn)().into();
1652                metadata_output.give(&metadata_cap, (timestamp, error));
1653
1654                // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
1655                // Returning would incorrectly present to the remap operator as progress to the
1656                // empty frontier which would be incorrectly recorded to the remap shard.
1657                std::future::pending::<()>().await;
1658                unreachable!("pending future never returns");
1659            }
1660        };
1661
1662        let (tx, mut rx) = mpsc::unbounded_channel();
1663        spawn_metadata_thread(config, consumer, topic, tx);
1664
1665        let mut prev_upstream_frontier = resume_upper;
1666
1667        while let Some((timestamp, mut update)) = rx.recv().await {
1668            if prev_upstream_frontier.is_empty() {
1669                return;
1670            }
1671
1672            if let Some(upstream_frontier) = update.upstream_frontier() {
1673                // Topics are identified by name but it's possible that a user recreates a topic
1674                // with the same name. Ideally we'd want to catch all of these cases and
1675                // immediately error out the source, since the data is effectively gone.
1676                // Unfortunately this is not possible without something like KIP-516.
1677                //
1678                // The best we can do is check whether the upstream frontier regressed. This tells
1679                // us that the topic was recreated and now contains fewer offsets and/or fewer
1680                // partitions. Note that we are not able to detect topic recreation if neither of
1681                // the two are true.
1682                if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1683                    let error = SourceError {
1684                        error: SourceErrorDetails::Other("topic was recreated".into()),
1685                    };
1686                    update = MetadataUpdate::DefiniteError(error);
1687                }
1688            }
1689
1690            if let Some(upstream_frontier) = update.upstream_frontier() {
1691                prev_upstream_frontier = upstream_frontier.clone();
1692
1693                let probe = Probe {
1694                    probe_ts: timestamp,
1695                    upstream_frontier,
1696                };
1697                probe_output.give(&probe_cap, probe);
1698            }
1699
1700            metadata_output.give(&metadata_cap, (timestamp, update));
1701        }
1702    });
1703
1704    (metadata_stream, probe_stream, button.press_on_drop())
1705}
1706
1707fn spawn_metadata_thread<C: ConsumerContext>(
1708    config: RawSourceCreationConfig,
1709    consumer: BaseConsumer<TunnelingClientContext<C>>,
1710    topic: String,
1711    tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1712) {
1713    // Linux thread names are limited to 15 characters. Use a truncated ID to fit the name.
1714    thread::Builder::new()
1715        .name(format!("kfk-mtdt-{}", config.id))
1716        .spawn(move || {
1717            trace!(
1718                source_id = config.id.to_string(),
1719                worker_id = config.worker_id,
1720                num_workers = config.worker_count,
1721                "kafka metadata thread: starting..."
1722            );
1723
1724            let mut ticker = probe::Ticker::new(
1725                || KAFKA_METADATA_FETCH_INTERVAL.get(config.config.config_set()),
1726                config.now_fn,
1727            );
1728
1729            loop {
1730                let probe_ts = ticker.tick_blocking();
1731                let result = fetch_partition_info(
1732                    &consumer,
1733                    &topic,
1734                    config
1735                        .config
1736                        .parameters
1737                        .kafka_timeout_config
1738                        .fetch_metadata_timeout,
1739                );
1740                trace!(
1741                    source_id = config.id.to_string(),
1742                    worker_id = config.worker_id,
1743                    num_workers = config.worker_count,
1744                    "kafka metadata thread: metadata fetch result: {:?}",
1745                    result
1746                );
1747                let update = match result {
1748                    Ok(partitions) => {
1749                        trace!(
1750                            source_id = config.id.to_string(),
1751                            worker_id = config.worker_id,
1752                            num_workers = config.worker_count,
1753                            "kafka metadata thread: fetched partition metadata info",
1754                        );
1755
1756                        MetadataUpdate::Partitions(partitions)
1757                    }
1758                    Err(GetPartitionsError::TopicDoesNotExist) => {
1759                        let error = SourceError {
1760                            error: SourceErrorDetails::Other("topic was deleted".into()),
1761                        };
1762                        MetadataUpdate::DefiniteError(error)
1763                    }
1764                    Err(e) => {
1765                        let kafka_status = Some(HealthStatusUpdate::stalled(
1766                            format!("{}", e.display_with_causes()),
1767                            None,
1768                        ));
1769
1770                        let ssh_status = consumer.client().context().tunnel_status();
1771                        let ssh_status = match ssh_status {
1772                            SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1773                            SshTunnelStatus::Errored(e) => {
1774                                Some(HealthStatusUpdate::stalled(e, None))
1775                            }
1776                        };
1777
1778                        MetadataUpdate::TransientError(HealthStatus {
1779                            kafka: kafka_status,
1780                            ssh: ssh_status,
1781                        })
1782                    }
1783                };
1784
1785                if tx.send((probe_ts, update)).is_err() {
1786                    break;
1787                }
1788            }
1789
1790            info!(
1791                source_id = config.id.to_string(),
1792                worker_id = config.worker_id,
1793                num_workers = config.worker_count,
1794                "kafka metadata thread: receiver has gone away; shutting down."
1795            )
1796        })
1797        .unwrap();
1798}