Skip to main content

mz_storage/source/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::str::{self};
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use chrono::{DateTime, NaiveDateTime};
19use differential_dataflow::{AsCollection, Hashable};
20use futures::StreamExt;
21use itertools::Itertools;
22use maplit::btreemap;
23use mz_kafka_util::client::{
24    GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
25};
26use mz_ore::assert_none;
27use mz_ore::cast::CastFrom;
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::iter::IteratorExt;
31use mz_repr::adt::timestamp::CheckedTimestamp;
32use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
33use mz_ssh_util::tunnel::SshTunnelStatus;
34use mz_storage_types::errors::{
35    ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
36};
37use mz_storage_types::sources::kafka::{
38    KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
39};
40use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
41use mz_timely_util::antichain::AntichainExt;
42use mz_timely_util::builder_async::{
43    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::containers::stack::FueledBuilder;
46use mz_timely_util::order::Partitioned;
47use rdkafka::consumer::base_consumer::PartitionQueue;
48use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
49use rdkafka::error::KafkaError;
50use rdkafka::message::{BorrowedMessage, Headers};
51use rdkafka::statistics::Statistics;
52use rdkafka::topic_partition_list::Offset;
53use rdkafka::{ClientContext, Message, TopicPartitionList};
54use serde::{Deserialize, Serialize};
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Pipeline;
58use timely::dataflow::operators::Capability;
59use timely::dataflow::operators::core::Partition;
60use timely::dataflow::operators::vec::Broadcast;
61use timely::dataflow::{Scope, StreamVec};
62use timely::progress::Antichain;
63use timely::progress::Timestamp;
64use tokio::sync::{Notify, mpsc};
65use tracing::{error, info, trace};
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
68use crate::metrics::source::kafka::KafkaSourceMetrics;
69use crate::source::types::{FuelSize, Probe, SignaledFuture, SourceRender, StackedCollection};
70use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
71use crate::statistics::SourceStatistics;
72
73#[derive(
74    Clone,
75    Debug,
76    Default,
77    PartialEq,
78    Eq,
79    PartialOrd,
80    Ord,
81    Serialize,
82    Deserialize
83)]
84struct HealthStatus {
85    kafka: Option<HealthStatusUpdate>,
86    ssh: Option<HealthStatusUpdate>,
87}
88
89impl HealthStatus {
90    fn kafka(update: HealthStatusUpdate) -> Self {
91        Self {
92            kafka: Some(update),
93            ssh: None,
94        }
95    }
96
97    fn ssh(update: HealthStatusUpdate) -> Self {
98        Self {
99            kafka: None,
100            ssh: Some(update),
101        }
102    }
103}
104
105/// Contains all information necessary to ingest data from Kafka
106pub struct KafkaSourceReader {
107    /// Name of the topic on which this source is backed on
108    topic_name: String,
109    /// Name of the source (will have format kafka-source-id)
110    source_name: String,
111    /// Source global ID
112    id: GlobalId,
113    /// Kafka consumer for this source
114    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
115    /// List of consumers. A consumer should be assigned per partition to guarantee fairness
116    partition_consumers: Vec<PartitionConsumer>,
117    /// Worker ID
118    worker_id: usize,
119    /// Total count of workers
120    worker_count: usize,
121    /// The most recently read offset for each partition known to this source
122    /// reader by output-index. An offset of -1 indicates that no prior message
123    /// has been read for the given partition.
124    last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
125    /// The offset to start reading from for each partition.
126    start_offsets: BTreeMap<PartitionId, i64>,
127    /// Channel to receive Kafka statistics JSON blobs from the stats callback.
128    stats_rx: crossbeam_channel::Receiver<Jsonb>,
129    /// A handle to the partition specific metrics
130    partition_metrics: KafkaSourceMetrics,
131    /// Per partition capabilities used to produce messages
132    partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
133}
134
135struct PartitionCapability {
136    /// The capability of the data produced
137    data: Capability<KafkaTimestamp>,
138}
139
140/// The high/low watermark offsets of a Kafka partition.
141///
142/// This is the offset of either the first available or the latest message in the topic/partition
143///  available for consumption + 1.
144type PartitionWatermark = u64;
145
146/// Processes `resume_uppers` stream updates, committing them upstream and
147/// storing them in the `progress_statistics` to be emitted later.
148pub struct KafkaResumeUpperProcessor {
149    config: RawSourceCreationConfig,
150    topic_name: String,
151    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
152    statistics: Vec<SourceStatistics>,
153}
154
155/// Computes whether this worker is responsible for consuming a partition. It assigns partitions to
156/// workers in a round-robin fashion, starting at an arbitrary worker based on the hash of the
157/// source id.
158fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
159    let pid = usize::try_from(pid).expect("positive pid");
160    ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
161}
162
163struct SourceOutputInfo {
164    id: GlobalId,
165    output_index: usize,
166    resume_upper: Antichain<KafkaTimestamp>,
167    metadata_columns: Vec<KafkaMetadataKind>,
168}
169
170impl SourceRender for KafkaSourceConnection {
171    // TODO(petrosagg): The type used for the partition (RangeBound<PartitionId>) doesn't need to
172    // be so complicated and we could instead use `Partitioned<PartitionId, Option<u64>>` where all
173    // ranges are inclusive and a time of `None` signifies that a particular partition is not
174    // present. This requires an shard migration of the remap shard.
175    type Time = KafkaTimestamp;
176
177    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
178
179    fn render<'scope>(
180        self,
181        scope: Scope<'scope, KafkaTimestamp>,
182        config: &RawSourceCreationConfig,
183        resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
184        start_signal: impl std::future::Future<Output = ()> + 'static,
185    ) -> (
186        BTreeMap<
187            GlobalId,
188            StackedCollection<'scope, KafkaTimestamp, Result<SourceMessage, DataflowError>>,
189        >,
190        StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
191        StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
192        Vec<PressOnDropButton>,
193    ) {
194        let (metadata, probes, metadata_token) =
195            render_metadata_fetcher(scope, self.clone(), config.clone());
196        let (data, health, reader_token) = render_reader(
197            scope,
198            self,
199            config.clone(),
200            resume_uppers,
201            metadata,
202            start_signal,
203        );
204
205        let partition_count = u64::cast_from(config.source_exports.len());
206        let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
207            partition_count,
208            |((output, data), time, diff)| {
209                let output = u64::cast_from(output);
210                (output, (data, time, diff))
211            },
212        );
213        let mut data_collections = BTreeMap::new();
214        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
215            data_collections.insert(*id, data_stream.as_collection());
216        }
217
218        (
219            data_collections,
220            health,
221            probes,
222            vec![metadata_token, reader_token],
223        )
224    }
225}
226
227/// Render the reader of a Kafka source.
228///
229/// The reader is responsible for polling the Kafka topic partitions for new messages, and
230/// transforming them into a `SourceMessage` collection.
231fn render_reader<'scope>(
232    scope: Scope<'scope, KafkaTimestamp>,
233    connection: KafkaSourceConnection,
234    config: RawSourceCreationConfig,
235    resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
236    metadata_stream: StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
237    start_signal: impl std::future::Future<Output = ()> + 'static,
238) -> (
239    StackedCollection<'scope, KafkaTimestamp, (usize, Result<SourceMessage, DataflowError>)>,
240    StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
241    PressOnDropButton,
242) {
243    let name = format!("KafkaReader({})", config.id);
244    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
245
246    let (data_output, stream) = builder.new_output::<FueledBuilder<_>>();
247    let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
248
249    let mut metadata_input = builder.new_disconnected_input(metadata_stream.broadcast(), Pipeline);
250
251    let mut outputs = vec![];
252
253    // Contains the `SourceStatistics` entries for exports that require a snapshot.
254    let mut all_export_stats = vec![];
255    let mut snapshot_export_stats = vec![];
256    for (idx, (id, export)) in config.source_exports.iter().enumerate() {
257        let SourceExport {
258            details,
259            storage_metadata: _,
260            data_config: _,
261        } = export;
262        let resume_upper = Antichain::from_iter(
263            config
264                .source_resume_uppers
265                .get(id)
266                .expect("all source exports must be present in source resume uppers")
267                .iter()
268                .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
269        );
270
271        let metadata_columns = match details {
272            SourceExportDetails::Kafka(details) => details
273                .metadata_columns
274                .iter()
275                .map(|(_name, kind)| kind.clone())
276                .collect::<Vec<_>>(),
277            _ => panic!("unexpected source export details: {:?}", details),
278        };
279
280        let statistics = config
281            .statistics
282            .get(id)
283            .expect("statistics have been initialized")
284            .clone();
285        // export requires snapshot
286        if resume_upper.as_ref() == &[Partitioned::minimum()] {
287            snapshot_export_stats.push(statistics.clone());
288        }
289        all_export_stats.push(statistics);
290
291        let output = SourceOutputInfo {
292            id: *id,
293            resume_upper,
294            output_index: idx,
295            metadata_columns,
296        };
297        outputs.push(output);
298    }
299
300    let busy_signal = Arc::clone(&config.busy_signal);
301    let button = builder.build(move |caps| {
302        SignaledFuture::new(busy_signal, async move {
303            let [mut data_cap, health_cap] = caps.try_into().unwrap();
304
305            let client_id = connection.client_id(
306                config.config.config_set(),
307                &config.config.connection_context,
308                config.id,
309            );
310            let group_id = connection.group_id(&config.config.connection_context, config.id);
311            let KafkaSourceConnection {
312                connection,
313                topic,
314                topic_metadata_refresh_interval,
315                start_offsets,
316                metadata_columns: _,
317                // Exhaustive match protects against forgetting to apply an
318                // option. Ignored fields are justified below.
319                connection_id: _,   // not needed here
320                group_id_prefix: _, // used above via `connection.group_id`
321            } = connection;
322
323            info!(
324                source_id = config.id.to_string(),
325                worker_id = config.worker_id,
326                num_workers = config.worker_count,
327                "instantiating Kafka source reader at offsets {start_offsets:?}"
328            );
329
330            let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
331            let notificator = Arc::new(Notify::new());
332
333            let consumer: Result<BaseConsumer<_>, _> = connection
334                .create_with_context(
335                    &config.config,
336                    GlueConsumerContext {
337                        notificator: Arc::clone(&notificator),
338                        stats_tx,
339                        inner: MzClientContext::default(),
340                    },
341                    &btreemap! {
342                        // Disable Kafka auto commit. We manually commit offsets
343                        // to Kafka once we have reclocked those offsets, so
344                        // that users can use standard Kafka tools for progress
345                        // tracking.
346                        "enable.auto.commit" => "false".into(),
347                        // Always begin ingest at 0 when restarted, even if Kafka
348                        // contains committed consumer read offsets
349                        "auto.offset.reset" => "earliest".into(),
350                        // Use the user-configured topic metadata refresh
351                        // interval.
352                        "topic.metadata.refresh.interval.ms" =>
353                            topic_metadata_refresh_interval
354                            .as_millis()
355                            .to_string(),
356                        // TODO: document the rationale for this.
357                        "fetch.message.max.bytes" => "134217728".into(),
358                        // Consumer group ID, which may have been overridden by
359                        // the user. librdkafka requires this, and we use offset
360                        // committing to provide a way for users to monitor
361                        // ingest progress, though we do not rely on the
362                        // committed offsets for any functionality.
363                        "group.id" => group_id.clone(),
364                        // Allow Kafka monitoring tools to identify this
365                        // consumer.
366                        "client.id" => client_id.clone(),
367                    },
368                    InTask::Yes,
369                )
370                .await;
371
372            let consumer = match consumer {
373                Ok(consumer) => Arc::new(consumer),
374                Err(e) => {
375                    let update = HealthStatusUpdate::halting(
376                        format!(
377                            "failed creating kafka reader consumer: {}",
378                            e.display_with_causes()
379                        ),
380                        None,
381                    );
382                    health_output.give(
383                        &health_cap,
384                        HealthStatusMessage {
385                            id: None,
386                            namespace: if matches!(e, ContextCreationError::Ssh(_)) {
387                                StatusNamespace::Ssh
388                            } else {
389                                StatusNamespace::Kafka
390                            },
391                            update: update.clone(),
392                        },
393                    );
394                    for (output, update) in outputs.iter().repeat_clone(update) {
395                        health_output.give(
396                            &health_cap,
397                            HealthStatusMessage {
398                                id: Some(output.id),
399                                namespace: if matches!(e, ContextCreationError::Ssh(_)) {
400                                    StatusNamespace::Ssh
401                                } else {
402                                    StatusNamespace::Kafka
403                                },
404                                update,
405                            },
406                        );
407                    }
408                    // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
409                    // Returning would incorrectly present to the remap operator as progress to the
410                    // empty frontier which would be incorrectly recorded to the remap shard.
411                    std::future::pending::<()>().await;
412                    unreachable!("pending future never returns");
413                }
414            };
415
416            // Start offsets is a map from partition to the next offset to read from.
417            let mut start_offsets: BTreeMap<_, u64> = start_offsets
418                .clone()
419                .into_iter()
420                .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
421                .map(|(pid, offset)| (pid, u64::try_from(offset).expect("start offsets must be non-negative and fit into u64")))
422                .collect();
423
424            let mut partition_capabilities = BTreeMap::new();
425            let mut max_pid = None;
426            let resume_upper = Antichain::from_iter(
427                outputs
428                    .iter()
429                    .map(|output| output.resume_upper.clone())
430                    .flatten(),
431            );
432
433            tracing::info!(
434                source_id = config.id.to_string(),
435                worker_id = config.worker_id,
436                num_workers = config.worker_count,
437                "Kafka source reader starting rehydration with resume upper: {resume_upper:?} and start offsets: {start_offsets:?}"
438            );
439
440            for ts in resume_upper.elements() {
441                if let Some(pid) = ts.interval().singleton() {
442                    let pid = pid.unwrap_exact();
443                    max_pid = std::cmp::max(max_pid, Some(*pid));
444
445                    if responsible_for_pid(&config, *pid) {
446                        let restored_offset = ts.timestamp().offset;
447                        if let Some(start_offset) = start_offsets.get_mut(pid) {
448                            *start_offset = std::cmp::max(restored_offset, *start_offset);
449                        } else {
450                            start_offsets.insert(*pid, restored_offset);
451                        }
452
453                        let part_ts = Partitioned::new_singleton(
454                            RangeBound::exact(*pid),
455                            ts.timestamp().clone(),
456                        );
457                        let part_cap = PartitionCapability {
458                            data: data_cap.delayed(&part_ts),
459                        };
460                        partition_capabilities.insert(*pid, part_cap);
461                    }
462                }
463            }
464            let lower = max_pid
465                .map(RangeBound::after)
466                .unwrap_or(RangeBound::NegInfinity);
467            let future_ts =
468                Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
469            data_cap.downgrade(&future_ts);
470
471            if mz_storage_types::dyncfgs::KAFKA_LOW_WATERMARK_CHECK
472                .get(config.config.config_set())
473            {
474                let low_watermarks = fetch_partition_info(
475                    &consumer,
476                    topic.as_str(),
477                    config
478                        .config
479                        .parameters
480                        .kafka_timeout_config
481                        .fetch_metadata_timeout,
482                    Offset::Beginning, // fetch the low watermark
483                )
484                .unwrap_or_else(|e| {
485                    tracing::warn!(
486                        source_id = config.id.to_string(),
487                        worker_id = config.worker_id,
488                        num_workers = config.worker_count,
489                        "Failed to fetch watermarks for topic {topic}: {e}"
490                    );
491                    let update = HealthStatusUpdate::stalled(
492                        format!("Failed to fetch watermarks for topic {topic}: {e}"),
493                        None,
494                    );
495                    health_output.give(
496                        &health_cap,
497                        HealthStatusMessage {
498                            id: None,
499                            namespace: StatusNamespace::Kafka,
500                            update: update.clone(),
501                        },
502                    );
503                    for (output, update) in outputs.iter().repeat_clone(update) {
504                        health_output.give(
505                            &health_cap,
506                            HealthStatusMessage {
507                                id: Some(output.id),
508                                namespace: StatusNamespace::Kafka,
509                                update,
510                            },
511                        );
512                    }
513                    let ssh_update = match consumer.client().context().tunnel_status() {
514                        SshTunnelStatus::Running => HealthStatusUpdate::running(),
515                        SshTunnelStatus::Errored(e) => HealthStatusUpdate::stalled(e, None),
516                    };
517                    health_output.give(
518                        &health_cap,
519                        HealthStatusMessage {
520                            id: None,
521                            namespace: StatusNamespace::Ssh,
522                            update: ssh_update.clone(),
523                        },
524                    );
525                    for (output, ssh_update) in outputs.iter().repeat_clone(ssh_update) {
526                        health_output.give(
527                            &health_cap,
528                            HealthStatusMessage {
529                                id: Some(output.id),
530                                namespace: StatusNamespace::Ssh,
531                                update: ssh_update,
532                            },
533                        );
534                    }
535                    if let GetPartitionsError::TopicDoesNotExist = e {
536                        // If the topic doesn't exist, that is a definite error
537                        let error = Err(SourceError {
538                            error: SourceErrorDetails::Initialization(e.to_string().into()),
539                        }
540                        .into());
541                        let time = data_cap.time().clone();
542                        for (output, error) in
543                            outputs.iter().map(|o| o.output_index).repeat_clone(error)
544                        {
545                            let update = ((output, error), time.clone(), Diff::ONE);
546                            data_output.give(&data_cap, update);
547                        }
548                    }
549                    BTreeMap::new()
550                });
551                for (pid, lwm) in &low_watermarks {
552                    if responsible_for_pid(&config, *pid) {
553                        // If a start offset exists for this partition, then either the user specified it
554                        // or we restored it from the resume upper. In either case, if the low watermark is
555                        // greater than the start offset, we know for certain that the offset we need to
556                        // start at has been compacted away or dropped from retention by Kafka. If there
557                        // is no start offset, then we set it to the low watermark and start consuming from
558                        // there, assuming the user doesn't care about the messages that have been compacted away.
559                        if let Some(start_offset) = start_offsets.get_mut(pid) {
560                            tracing::info!(
561                                source_id = config.id.to_string(),
562                                worker_id = config.worker_id,
563                                num_workers = config.worker_count,
564                                "restored offset {start_offset} for topic {topic} partition {pid} with low watermark {lwm}"
565                            );
566                            if lwm > start_offset {
567                                tracing::error!(
568                                    source_id = config.id.to_string(),
569                                    worker_id = config.worker_id,
570                                    num_workers = config.worker_count,
571                                    "start offset and resume upper {start_offset} for topic {topic} \
572                                    partition {pid} is behind the low watermark {lwm}. This likely \
573                                    means that the offsets have been compacted away by Kafka."
574                                );
575                                let err_str = format!(
576                                    "Low watermark {lwm} of kafka topic {topic} partition {pid} \
577                                    is past the start offset/resume upper: {start_offset} \
578                                    This likely means that the offsets have been compacted away \
579                                    by Kafka. Please consider setting a higher start offset or \
580                                    adjusting your retention policies to prevent this.",
581                                );
582
583                                let update = HealthStatusUpdate::stalled(
584                                    err_str.clone(),
585                                    None,
586                                );
587                                health_output.give(
588                                    &health_cap,
589                                    HealthStatusMessage {
590                                        id: None,
591                                        namespace: StatusNamespace::Kafka,
592                                        update: update.clone(),
593                                    },
594                                );
595                                let error = Err(
596                                    SourceError{
597                                        error:SourceErrorDetails::Initialization(err_str.into())
598                                    }.into()
599                                );
600                                let time = data_cap.time().clone();
601                                for (output, error) in
602                                    outputs.iter().map(|o| o.output_index).repeat_clone(error)
603                                {
604                                    let update = ((output, error), time.clone(), Diff::ONE);
605                                    let size = update.fuel_size();
606                                    data_output
607                                        .give_fueled(&data_cap, update, size)
608                                        .await;
609                                }
610                                return;
611                            }
612                        } else {
613                            tracing::warn!(
614                                source_id = config.id.to_string(),
615                                worker_id = config.worker_id,
616                                num_workers = config.worker_count,
617                                "partition {pid} has a non-zero low watermark {lwm}, but no start offset or \
618                                resume upper was found for this partition. Setting start offset to low watermark"
619                            );
620                            start_offsets.insert(*pid, *lwm);
621                        }
622                    }
623                }
624            }
625
626            // Note that we wait for this AFTER we downgrade to the source `resume_upper`. This
627            // allows downstream operators (namely, the `reclock_operator`) to downgrade to the
628            // `resume_upper`, which is necessary for this basic form of backpressure to work.
629            start_signal.await;
630            info!(
631                source_id = config.id.to_string(),
632                worker_id = config.worker_id,
633                num_workers = config.worker_count,
634                "kafka worker noticed rehydration is finished, starting partition queues..."
635            );
636
637            let partition_ids = start_offsets.keys().copied().collect();
638            let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
639            let start_offsets = start_offsets.iter().map(|(pid, offset)| (*pid, i64::try_from(*offset).expect("start offsets must fit into i64"))).collect();
640
641            let mut reader = KafkaSourceReader {
642                topic_name: topic.clone(),
643                source_name: config.name.clone(),
644                id: config.id,
645                partition_consumers: Vec::new(),
646                consumer: Arc::clone(&consumer),
647                worker_id: config.worker_id,
648                worker_count: config.worker_count,
649                last_offsets: outputs
650                    .iter()
651                    .map(|output| (output.output_index, BTreeMap::new()))
652                    .collect(),
653                start_offsets,
654                stats_rx,
655                partition_metrics: config.metrics.get_kafka_source_metrics(
656                    partition_ids,
657                    topic.clone(),
658                    config.id,
659                ),
660                partition_capabilities,
661            };
662
663            let offset_committer = KafkaResumeUpperProcessor {
664                config: config.clone(),
665                topic_name: topic.clone(),
666                consumer,
667                statistics: all_export_stats.clone(),
668            };
669
670            // Seed the progress metrics with `0` if we are snapshotting.
671            if !snapshot_export_stats.is_empty() {
672                if let Err(e) = offset_committer
673                    .process_frontier(resume_upper.clone())
674                    .await
675                {
676                    offset_commit_metrics.offset_commit_failures.inc();
677                    tracing::warn!(
678                        %e,
679                        "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
680                        worker_id = config.worker_id,
681                        source_id = config.id,
682                        upper = resume_upper.pretty()
683                    );
684                }
685                // Reset snapshot statistics for any exports that are not involved
686                // in this round of snapshotting. Those that are snapshotting this round will
687                // see updates as the snapshot commences.
688                for statistics in config.statistics.values() {
689                    statistics.set_snapshot_records_known(0);
690                    statistics.set_snapshot_records_staged(0);
691                }
692            }
693
694            let resume_uppers_process_loop = async move {
695                tokio::pin!(resume_uppers);
696                while let Some(frontier) = resume_uppers.next().await {
697                    if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
698                        offset_commit_metrics.offset_commit_failures.inc();
699                        tracing::warn!(
700                            %e,
701                            "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
702                            worker_id = config.worker_id,
703                            source_id = config.id,
704                            upper = frontier.pretty()
705                        );
706                    }
707                }
708                // During dataflow shutdown this loop can end due to the general chaos caused by
709                // dropping tokens as a means to shutdown. This call ensures this future never ends
710                // and we instead rely on this operator being dropped altogether when *its* token
711                // is dropped.
712                std::future::pending::<()>().await;
713            };
714            tokio::pin!(resume_uppers_process_loop);
715
716            let mut metadata_update: Option<MetadataUpdate> = None;
717            let mut snapshot_total = None;
718
719            let max_wait_time =
720                mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
721            loop {
722                // Wait for data or metadata events while also making progress with offset
723                // committing.
724                tokio::select! {
725                    // TODO(petrosagg): remove the timeout and rely purely on librdkafka waking us
726                    // up
727                    _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
728
729                    _ = metadata_input.ready() => {
730                        // Collect all pending updates, then only keep the most recent one.
731                        let mut updates = Vec::new();
732                        while let Some(event) = metadata_input.next_sync() {
733                            if let Event::Data(_, mut data) = event {
734                                updates.append(&mut data);
735                            }
736                        }
737                        metadata_update = updates
738                            .into_iter()
739                            .max_by_key(|(ts, _)| *ts)
740                            .map(|(_, update)| update);
741                    }
742
743                    // This future is not cancel safe but we are only passing a reference to it in
744                    // the select! loop so the future stays on the stack and never gets cancelled
745                    // until the end of the function.
746                    _ = resume_uppers_process_loop.as_mut() => {},
747                }
748
749                match metadata_update.take() {
750                    Some(MetadataUpdate::Partitions(partitions)) => {
751                        let max_pid = partitions.keys().last().cloned();
752                        let lower = max_pid
753                            .map(RangeBound::after)
754                            .unwrap_or(RangeBound::NegInfinity);
755                        let future_ts = Partitioned::new_range(
756                            lower,
757                            RangeBound::PosInfinity,
758                            MzOffset::from(0),
759                        );
760
761                        let mut offset_known = 0;
762                        for (&pid, &high_watermark) in &partitions {
763                            if responsible_for_pid(&config, pid) {
764                                offset_known += high_watermark;
765                                reader.ensure_partition(pid);
766                                if let Entry::Vacant(entry) =
767                                    reader.partition_capabilities.entry(pid)
768                                {
769                                    let start_offset = match reader.start_offsets.get(&pid) {
770                                        Some(&offset) => offset.try_into().unwrap(),
771                                        None => 0u64,
772                                    };
773                                    let part_since_ts = Partitioned::new_singleton(
774                                        RangeBound::exact(pid),
775                                        MzOffset::from(start_offset),
776                                    );
777
778                                    entry.insert(PartitionCapability {
779                                        data: data_cap.delayed(&part_since_ts),
780                                    });
781                                }
782                            }
783                        }
784
785                        // If we are snapshotting, record our first set of partitions as the snapshot
786                        // size.
787                        if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
788                            // Note that we want to represent the _number of offsets_, which
789                            // means the watermark's frontier semantics is correct, without
790                            // subtracting (Kafka offsets start at 0).
791                            snapshot_total = Some(offset_known);
792                        }
793
794                        // Clear all the health namespaces we know about.
795                        // Note that many kafka sources's don't have an ssh tunnel, but the
796                        // `health_operator` handles this fine.
797                        for output in &outputs {
798                            for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
799                                health_output.give(
800                                    &health_cap,
801                                    HealthStatusMessage {
802                                        id: Some(output.id),
803                                        namespace,
804                                        update: HealthStatusUpdate::running(),
805                                    },
806                                );
807                            }
808                        }
809                        for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
810                            health_output.give(
811                                &health_cap,
812                                HealthStatusMessage {
813                                    id: None,
814                                    namespace,
815                                    update: HealthStatusUpdate::running(),
816                                },
817                            );
818                        }
819
820                        for export_stat in all_export_stats.iter() {
821                            export_stat.set_offset_known(offset_known);
822                        }
823
824                        data_cap.downgrade(&future_ts);
825                    }
826                    Some(MetadataUpdate::TransientError(status)) => {
827                        if let Some(update) = status.kafka {
828                            health_output.give(
829                                &health_cap,
830                                HealthStatusMessage {
831                                    id: None,
832                                    namespace: StatusNamespace::Kafka,
833                                    update: update.clone(),
834                                },
835                            );
836                            for (output, update) in outputs.iter().repeat_clone(update) {
837                                health_output.give(
838                                    &health_cap,
839                                    HealthStatusMessage {
840                                        id: Some(output.id),
841                                        namespace: StatusNamespace::Kafka,
842                                        update,
843                                    },
844                                );
845                            }
846                        }
847                        if let Some(update) = status.ssh {
848                            health_output.give(
849                                &health_cap,
850                                HealthStatusMessage {
851                                    id: None,
852                                    namespace: StatusNamespace::Ssh,
853                                    update: update.clone(),
854                                },
855                            );
856                            for (output, update) in outputs.iter().repeat_clone(update) {
857                                health_output.give(
858                                    &health_cap,
859                                    HealthStatusMessage {
860                                        id: Some(output.id),
861                                        namespace: StatusNamespace::Ssh,
862                                        update,
863                                    },
864                                );
865                            }
866                        }
867                    }
868                    Some(MetadataUpdate::DefiniteError(error)) => {
869                        health_output.give(
870                            &health_cap,
871                            HealthStatusMessage {
872                                id: None,
873                                namespace: StatusNamespace::Kafka,
874                                update: HealthStatusUpdate::stalled(
875                                    error.to_string(),
876                                    None,
877                                ),
878                            },
879                        );
880                        let error = Err(error.into());
881                        let time = data_cap.time().clone();
882                        for (output, error) in
883                            outputs.iter().map(|o| o.output_index).repeat_clone(error)
884                        {
885                            let update = ((output, error), time, Diff::ONE);
886                            let size = update.fuel_size();
887                            data_output
888                                .give_fueled(&data_cap, update, size)
889                                .await;
890                        }
891
892                        return;
893                    }
894                    None => {}
895                }
896
897                // Poll the consumer once. We split the consumer's partitions out into separate
898                // queues and poll those individually, but it's still necessary to drive logic that
899                // consumes from rdkafka's internal event queue, such as statistics callbacks.
900                //
901                // Additionally, assigning topics and splitting them off into separate queues is
902                // not atomic, so we expect to see at least some messages to show up when polling
903                // the consumer directly.
904                while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
905                    match result {
906                        Err(e) => {
907                            let error = format!(
908                                "kafka error when polling consumer for source: {} topic: {} : {}",
909                                reader.source_name, reader.topic_name, e
910                            );
911                            let status = HealthStatusUpdate::stalled(error, None);
912                            health_output.give(
913                                &health_cap,
914                                HealthStatusMessage {
915                                    id: None,
916                                    namespace: StatusNamespace::Kafka,
917                                    update: status.clone(),
918                                },
919                            );
920                            for (output, status) in outputs.iter().repeat_clone(status) {
921                                health_output.give(
922                                    &health_cap,
923                                    HealthStatusMessage {
924                                        id: Some(output.id),
925                                        namespace: StatusNamespace::Kafka,
926                                        update: status,
927                                    },
928                                );
929                            }
930                        }
931                        Ok(message) => {
932                            let output_messages = outputs
933                                .iter()
934                                .map(|output| {
935                                    let (message, ts) = construct_source_message(
936                                        &message,
937                                        &output.metadata_columns,
938                                    );
939                                    (output.output_index, message, ts)
940                                })
941                                // This vec allocation is required to allow obtaining a `&mut`
942                                // on `reader` for the `reader.handle_message` call in the
943                                // loop below since  `message` is borrowed from `reader`.
944                                .collect::<Vec<_>>();
945                            for (output_index, message, ts) in output_messages {
946                                if let Some((msg, time, diff)) =
947                                    reader.handle_message(message, ts, &output_index)
948                                {
949                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
950                                    let part_cap = &reader.partition_capabilities[pid].data;
951                                    let msg = msg.map_err(|e| {
952                                        DataflowError::SourceError(Box::new(SourceError {
953                                            error: SourceErrorDetails::Other(e.to_string().into()),
954                                        }))
955                                    });
956                                    let update = ((output_index, msg), time, diff);
957                                    let size = update.fuel_size();
958                                    data_output
959                                        .give_fueled(part_cap, update, size)
960                                        .await;
961                                }
962                            }
963                        }
964                    }
965                }
966
967                reader.update_stats();
968
969                // Take the consumers temporarily to get around borrow checker errors
970                let mut consumers = std::mem::take(&mut reader.partition_consumers);
971                for consumer in consumers.iter_mut() {
972                    let pid = consumer.pid();
973                    // We want to make sure the rest of the actions in the outer loops get
974                    // a chance to run. If rdkafka keeps pumping data at us we might find
975                    // ourselves in a situation where we keep dumping data into the
976                    // dataflow without signaling progress. For this reason we consume at most
977                    // 10k messages from each partition and go around the loop.
978                    let mut partition_exhausted = false;
979                    for _ in 0..10_000 {
980                        let Some(message) = consumer.get_next_message().transpose() else {
981                            partition_exhausted = true;
982                            break;
983                        };
984
985                        for output in outputs.iter() {
986                            let message = match &message {
987                                Ok((msg, pid)) => {
988                                    let (msg, ts) =
989                                        construct_source_message(msg, &output.metadata_columns);
990                                    assert_eq!(*pid, ts.0);
991                                    Ok(reader.handle_message(msg, ts, &output.output_index))
992                                }
993                                Err(err) => Err(err),
994                            };
995                            match message {
996                                Ok(Some((msg, time, diff))) => {
997                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
998                                    let part_cap = &reader.partition_capabilities[pid].data;
999                                    let msg = msg.map_err(|e| {
1000                                        DataflowError::SourceError(Box::new(SourceError {
1001                                            error: SourceErrorDetails::Other(e.to_string().into()),
1002                                        }))
1003                                    });
1004                                    let update =
1005                                        ((output.output_index, msg), time, diff);
1006                                    let size = update.fuel_size();
1007                                    data_output
1008                                        .give_fueled(part_cap, update, size)
1009                                        .await;
1010                                }
1011                                // The message was from an offset we've already seen.
1012                                Ok(None) => continue,
1013                                Err(err) => {
1014                                    let last_offset = reader
1015                                        .last_offsets
1016                                        .get(&output.output_index)
1017                                        .expect("output known to be installed")
1018                                        .get(&pid)
1019                                        .expect("partition known to be installed");
1020
1021                                    let status = HealthStatusUpdate::stalled(
1022                                        format!(
1023                                            "error consuming from source: {} topic: {topic}:\
1024                                             partition: {pid} last processed offset:\
1025                                             {last_offset} : {err}",
1026                                            config.name
1027                                        ),
1028                                        None,
1029                                    );
1030                                    health_output.give(
1031                                        &health_cap,
1032                                        HealthStatusMessage {
1033                                            id: None,
1034                                            namespace: StatusNamespace::Kafka,
1035                                            update: status.clone(),
1036                                        },
1037                                    );
1038                                    health_output.give(
1039                                        &health_cap,
1040                                        HealthStatusMessage {
1041                                            id: Some(output.id),
1042                                            namespace: StatusNamespace::Kafka,
1043                                            update: status,
1044                                        },
1045                                    );
1046                                }
1047                            }
1048                        }
1049                    }
1050                    if !partition_exhausted {
1051                        notificator.notify_one();
1052                    }
1053                }
1054                // We can now put them back
1055                assert!(reader.partition_consumers.is_empty());
1056                reader.partition_consumers = consumers;
1057
1058                let positions = reader.consumer.position().unwrap();
1059                let topic_positions = positions.elements_for_topic(&reader.topic_name);
1060                let mut snapshot_staged = 0;
1061
1062                for position in topic_positions {
1063                    // The offset begins in the `Offset::Invalid` state in which case we simply
1064                    // skip this partition.
1065                    if let Offset::Offset(offset) = position.offset() {
1066                        let pid = position.partition();
1067                        let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
1068                        let upper =
1069                            Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
1070
1071                        let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
1072                        match part_cap.data.try_downgrade(&upper) {
1073                            Ok(()) => {
1074                                if !snapshot_export_stats.is_empty() {
1075                                    // The `.position()` of the consumer represents what offset we have
1076                                    // read up to.
1077                                    snapshot_staged += offset.try_into().unwrap_or(0u64);
1078                                    // This will always be `Some` at this point.
1079                                    if let Some(snapshot_total) = snapshot_total {
1080                                        // We will eventually read past the snapshot total, so we need
1081                                        // to bound it here.
1082                                        snapshot_staged =
1083                                            std::cmp::min(snapshot_staged, snapshot_total);
1084                                    }
1085                                }
1086                            }
1087                            Err(_) => {
1088                                // If we can't downgrade, it means we have already seen this offset.
1089                                // This is expected and we can safely ignore it.
1090                                info!(
1091                                    source_id = config.id.to_string(),
1092                                    worker_id = config.worker_id,
1093                                    num_workers = config.worker_count,
1094                                    "kafka source frontier downgrade skipped due to already \
1095                                     seen offset: {:?}",
1096                                    upper
1097                                );
1098                            }
1099                        };
1100
1101                    }
1102                }
1103
1104                if let (Some(snapshot_total), true) =
1105                    (snapshot_total, !snapshot_export_stats.is_empty())
1106                {
1107                    for export_stat in snapshot_export_stats.iter() {
1108                        export_stat.set_snapshot_records_known(snapshot_total);
1109                        export_stat.set_snapshot_records_staged(snapshot_staged);
1110                    }
1111                    if snapshot_total == snapshot_staged {
1112                        snapshot_export_stats.clear();
1113                    }
1114                }
1115            }
1116        })
1117    });
1118
1119    (
1120        stream.as_collection(),
1121        health_stream,
1122        button.press_on_drop(),
1123    )
1124}
1125
1126impl KafkaResumeUpperProcessor {
1127    async fn process_frontier(
1128        &self,
1129        frontier: Antichain<KafkaTimestamp>,
1130    ) -> Result<(), anyhow::Error> {
1131        use rdkafka::consumer::CommitMode;
1132
1133        // Generate a list of partitions that this worker is responsible for
1134        let mut offsets = vec![];
1135        let mut offset_committed = 0;
1136        for ts in frontier.iter() {
1137            if let Some(pid) = ts.interval().singleton() {
1138                let pid = pid.unwrap_exact();
1139                if responsible_for_pid(&self.config, *pid) {
1140                    offsets.push((pid.clone(), *ts.timestamp()));
1141
1142                    // Note that we do not subtract 1 from the frontier. Imagine
1143                    // that frontier is 2 for this pid. That means we have
1144                    // full processed offset 0 and offset 1, which means we have
1145                    // processed _2_ offsets.
1146                    offset_committed += ts.timestamp().offset;
1147                }
1148            }
1149        }
1150
1151        for export_stat in self.statistics.iter() {
1152            export_stat.set_offset_committed(offset_committed);
1153        }
1154
1155        if !offsets.is_empty() {
1156            let mut tpl = TopicPartitionList::new();
1157            for (pid, offset) in offsets {
1158                let offset_to_commit =
1159                    Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
1160                tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
1161                    .expect("offset known to be valid");
1162            }
1163            let consumer = Arc::clone(&self.consumer);
1164            mz_ore::task::spawn_blocking(
1165                || format!("source({}) kafka offset commit", self.config.id),
1166                move || consumer.commit(&tpl, CommitMode::Sync),
1167            )
1168            .await?;
1169        }
1170        Ok(())
1171    }
1172}
1173
1174impl KafkaSourceReader {
1175    /// Ensures that a partition queue for `pid` exists.
1176    fn ensure_partition(&mut self, pid: PartitionId) {
1177        if self.last_offsets.is_empty() {
1178            tracing::info!(
1179                source_id = %self.id,
1180                worker_id = %self.worker_id,
1181                "kafka source does not have any outputs, not creating partition queue");
1182
1183            return;
1184        }
1185        for last_offsets in self.last_offsets.values() {
1186            // early exit if we've already inserted this partition
1187            if last_offsets.contains_key(&pid) {
1188                return;
1189            }
1190        }
1191
1192        let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1193        self.create_partition_queue(pid, Offset::Offset(start_offset));
1194
1195        for last_offsets in self.last_offsets.values_mut() {
1196            let prev = last_offsets.insert(pid, start_offset - 1);
1197            assert_none!(prev);
1198        }
1199    }
1200
1201    /// Creates a new partition queue for `partition_id`.
1202    fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1203        info!(
1204            source_id = self.id.to_string(),
1205            worker_id = self.worker_id,
1206            num_workers = self.worker_count,
1207            "activating Kafka queue for topic {}, partition {}",
1208            self.topic_name,
1209            partition_id,
1210        );
1211
1212        // Collect old partition assignments
1213        let tpl = self.consumer.assignment().unwrap();
1214        // Create list from assignments
1215        let mut partition_list = TopicPartitionList::new();
1216        for partition in tpl.elements_for_topic(&self.topic_name) {
1217            partition_list
1218                .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1219                .expect("offset known to be valid");
1220        }
1221        // Add new partition
1222        partition_list
1223            .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1224            .expect("offset known to be valid");
1225        self.consumer
1226            .assign(&partition_list)
1227            .expect("assignment known to be valid");
1228
1229        // Since librdkafka v1.6.0, we need to recreate all partition queues
1230        // after every call to `self.consumer.assign`.
1231        let context = Arc::clone(self.consumer.context());
1232        for pc in &mut self.partition_consumers {
1233            pc.partition_queue = self
1234                .consumer
1235                .split_partition_queue(&self.topic_name, pc.pid)
1236                .expect("partition known to be valid");
1237            pc.partition_queue.set_nonempty_callback({
1238                let context = Arc::clone(&context);
1239                move || context.inner().activate()
1240            });
1241        }
1242
1243        let mut partition_queue = self
1244            .consumer
1245            .split_partition_queue(&self.topic_name, partition_id)
1246            .expect("partition known to be valid");
1247        partition_queue.set_nonempty_callback(move || context.inner().activate());
1248        self.partition_consumers
1249            .push(PartitionConsumer::new(partition_id, partition_queue));
1250        assert_eq!(
1251            self.consumer
1252                .assignment()
1253                .unwrap()
1254                .elements_for_topic(&self.topic_name)
1255                .len(),
1256            self.partition_consumers.len()
1257        );
1258    }
1259
1260    /// Read any statistics JSON blobs generated via the rdkafka statistics callback.
1261    fn update_stats(&mut self) {
1262        while let Ok(stats) = self.stats_rx.try_recv() {
1263            match serde_json::from_str::<Statistics>(&stats.to_string()) {
1264                Ok(statistics) => {
1265                    let topic = statistics.topics.get(&self.topic_name);
1266                    match topic {
1267                        Some(topic) => {
1268                            for (id, partition) in &topic.partitions {
1269                                self.partition_metrics
1270                                    .set_offset_max(*id, partition.hi_offset);
1271                            }
1272                        }
1273                        None => error!("No stats found for topic: {}", &self.topic_name),
1274                    }
1275                }
1276                Err(e) => {
1277                    error!("failed decoding librdkafka statistics JSON: {}", e);
1278                }
1279            }
1280        }
1281    }
1282
1283    /// Checks if the given message is viable for emission. This checks if the message offset is
1284    /// past the expected offset and returns None if it is not.
1285    fn handle_message(
1286        &mut self,
1287        message: Result<SourceMessage, KafkaHeaderParseError>,
1288        (partition, offset): (PartitionId, MzOffset),
1289        output_index: &usize,
1290    ) -> Option<(
1291        Result<SourceMessage, KafkaHeaderParseError>,
1292        KafkaTimestamp,
1293        Diff,
1294    )> {
1295        // Offsets are guaranteed to be 1) monotonically increasing *unless* there is
1296        // a network issue or a new partition added, at which point the consumer may
1297        // start processing the topic from the beginning, or we may see duplicate offsets
1298        // At all times, the guarantee : if we see offset x, we have seen all offsets [0,x-1]
1299        // that we are ever going to see holds.
1300        // Offsets are guaranteed to be contiguous when compaction is disabled. If compaction
1301        // is enabled, there may be gaps in the sequence.
1302        // If we see an "old" offset, we skip that message.
1303
1304        // Given the explicit consumer to partition assignment, we should never receive a message
1305        // for a partition for which we have no metadata
1306        assert!(
1307            self.last_offsets
1308                .get(output_index)
1309                .unwrap()
1310                .contains_key(&partition)
1311        );
1312
1313        let last_offset_ref = self
1314            .last_offsets
1315            .get_mut(output_index)
1316            .expect("output known to be installed")
1317            .get_mut(&partition)
1318            .expect("partition known to be installed");
1319
1320        let last_offset = *last_offset_ref;
1321        let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1322        if offset_as_i64 <= last_offset {
1323            info!(
1324                source_id = self.id.to_string(),
1325                worker_id = self.worker_id,
1326                num_workers = self.worker_count,
1327                "kafka message before expected offset: \
1328                 source {} (reading topic {}, partition {}, output {}) \
1329                 received offset {} expected offset {:?}",
1330                self.source_name,
1331                self.topic_name,
1332                partition,
1333                output_index,
1334                offset.offset,
1335                last_offset + 1,
1336            );
1337            // We explicitly should not consume the message as we have already processed it.
1338            None
1339        } else {
1340            *last_offset_ref = offset_as_i64;
1341
1342            let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1343            Some((message, ts, Diff::ONE))
1344        }
1345    }
1346}
1347
1348fn construct_source_message(
1349    msg: &BorrowedMessage<'_>,
1350    metadata_columns: &[KafkaMetadataKind],
1351) -> (
1352    Result<SourceMessage, KafkaHeaderParseError>,
1353    (PartitionId, MzOffset),
1354) {
1355    let pid = msg.partition();
1356    let Ok(offset) = u64::try_from(msg.offset()) else {
1357        panic!(
1358            "got negative offset ({}) from otherwise non-error'd kafka message",
1359            msg.offset()
1360        );
1361    };
1362
1363    let mut metadata = Row::default();
1364    let mut packer = metadata.packer();
1365    for kind in metadata_columns {
1366        match kind {
1367            KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1368            KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1369            KafkaMetadataKind::Timestamp => {
1370                let ts = msg
1371                    .timestamp()
1372                    .to_millis()
1373                    .expect("kafka sources always have upstream_time");
1374
1375                let d: Datum = DateTime::from_timestamp_millis(ts)
1376                    .and_then(|dt| {
1377                        let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1378                            dt.naive_utc().try_into().ok();
1379                        ct
1380                    })
1381                    .into();
1382                packer.push(d)
1383            }
1384            KafkaMetadataKind::Header { key, use_bytes } => {
1385                match msg.headers() {
1386                    Some(headers) => {
1387                        let d = headers
1388                            .iter()
1389                            .filter(|header| header.key == key)
1390                            .last()
1391                            .map(|header| match header.value {
1392                                Some(v) => {
1393                                    if *use_bytes {
1394                                        Ok(Datum::Bytes(v))
1395                                    } else {
1396                                        match str::from_utf8(v) {
1397                                            Ok(str) => Ok(Datum::String(str)),
1398                                            Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1399                                                key: key.clone(),
1400                                                raw: v.to_vec(),
1401                                            }),
1402                                        }
1403                                    }
1404                                }
1405                                None => Ok(Datum::Null),
1406                            })
1407                            .unwrap_or_else(|| {
1408                                Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1409                            });
1410                        match d {
1411                            Ok(d) => packer.push(d),
1412                            //abort with a definite error when the header is not found or cannot be parsed correctly
1413                            Err(err) => return (Err(err), (pid, offset.into())),
1414                        }
1415                    }
1416                    None => packer.push(Datum::Null),
1417                }
1418            }
1419            KafkaMetadataKind::Headers => {
1420                packer.push_list_with(|r| {
1421                    if let Some(headers) = msg.headers() {
1422                        for header in headers.iter() {
1423                            match header.value {
1424                                Some(v) => r.push_list_with(|record_row| {
1425                                    record_row.push(Datum::String(header.key));
1426                                    record_row.push(Datum::Bytes(v));
1427                                }),
1428                                None => r.push_list_with(|record_row| {
1429                                    record_row.push(Datum::String(header.key));
1430                                    record_row.push(Datum::Null);
1431                                }),
1432                            }
1433                        }
1434                    }
1435                });
1436            }
1437        }
1438    }
1439
1440    let key = match msg.key() {
1441        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1442        None => Row::pack([Datum::Null]),
1443    };
1444    let value = match msg.payload() {
1445        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1446        None => Row::pack([Datum::Null]),
1447    };
1448    (
1449        Ok(SourceMessage {
1450            key,
1451            value,
1452            metadata,
1453        }),
1454        (pid, offset.into()),
1455    )
1456}
1457
1458/// Wrapper around a partition containing the underlying consumer
1459struct PartitionConsumer {
1460    /// the partition id with which this consumer is associated
1461    pid: PartitionId,
1462    /// The underlying Kafka partition queue
1463    partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1464}
1465
1466impl PartitionConsumer {
1467    /// Creates a new partition consumer from underlying Kafka consumer
1468    fn new(
1469        pid: PartitionId,
1470        partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1471    ) -> Self {
1472        PartitionConsumer {
1473            pid,
1474            partition_queue,
1475        }
1476    }
1477
1478    /// Returns the next message to process for this partition (if any).
1479    ///
1480    /// The outer `Result` represents irrecoverable failures, the inner one can and will
1481    /// be transformed into empty values.
1482    ///
1483    /// The inner `Option` represents if there is a message to process.
1484    fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1485        match self.partition_queue.poll(Duration::from_millis(0)) {
1486            Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1487            Some(Err(err)) => Err(err),
1488            _ => Ok(None),
1489        }
1490    }
1491
1492    /// Return the partition id for this PartitionConsumer
1493    fn pid(&self) -> PartitionId {
1494        self.pid
1495    }
1496}
1497
1498/// An implementation of [`ConsumerContext`] that forwards statistics to the
1499/// worker
1500struct GlueConsumerContext {
1501    notificator: Arc<Notify>,
1502    stats_tx: crossbeam_channel::Sender<Jsonb>,
1503    inner: MzClientContext,
1504}
1505
1506impl ClientContext for GlueConsumerContext {
1507    fn stats_raw(&self, statistics: &[u8]) {
1508        match Jsonb::from_slice(statistics) {
1509            Ok(statistics) => {
1510                self.stats_tx
1511                    .send(statistics)
1512                    .expect("timely operator hung up while Kafka source active");
1513                self.activate();
1514            }
1515            Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1516        };
1517    }
1518
1519    // The shape of the rdkafka *Context traits require us to forward to the `MzClientContext`
1520    // implementation.
1521    fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1522        self.inner.log(level, fac, log_message)
1523    }
1524    fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1525        self.inner.error(error, reason)
1526    }
1527}
1528
1529impl GlueConsumerContext {
1530    fn activate(&self) {
1531        self.notificator.notify_one();
1532    }
1533}
1534
1535impl ConsumerContext for GlueConsumerContext {}
1536
1537#[cfg(test)]
1538mod tests {
1539    use std::sync::Arc;
1540    use std::time::Duration;
1541
1542    use mz_kafka_util::client::create_new_client_config_simple;
1543    use rdkafka::consumer::{BaseConsumer, Consumer};
1544    use rdkafka::{Message, Offset, TopicPartitionList};
1545    use uuid::Uuid;
1546
1547    // Splitting off a partition queue with an `Offset` that is not `Offset::Beginning` seems to
1548    // lead to a race condition where sometimes we receive messages from polling the main consumer
1549    // instead of on the partition queue. This can be surfaced by running the test in a loop (in
1550    // the dataflow directory) using:
1551    //
1552    // cargo stress --lib --release source::kafka::tests::reproduce_kafka_queue_issue
1553    //
1554    // cargo-stress can be installed via `cargo install cargo-stress`
1555    //
1556    // You need to set up a topic "queue-test" with 1000 "hello" messages in it. Obviously, running
1557    // this test requires a running Kafka instance at localhost:9092.
1558    #[mz_ore::test]
1559    #[ignore]
1560    fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1561        let topic_name = "queue-test";
1562        let pid = 0;
1563
1564        let mut kafka_config = create_new_client_config_simple();
1565        kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1566        kafka_config.set("enable.auto.commit", "false");
1567        kafka_config.set("group.id", Uuid::new_v4().to_string());
1568        kafka_config.set("fetch.message.max.bytes", "100");
1569        let consumer: BaseConsumer<_> = kafka_config.create()?;
1570
1571        let consumer = Arc::new(consumer);
1572
1573        let mut partition_list = TopicPartitionList::new();
1574        // Using Offset:Beginning here will work fine, only Offset:Offset(0) leads to the race
1575        // condition.
1576        partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1577
1578        consumer.assign(&partition_list)?;
1579
1580        let partition_queue = consumer
1581            .split_partition_queue(topic_name, pid)
1582            .expect("missing partition queue");
1583
1584        let expected_messages = 1_000;
1585
1586        let mut common_queue_count = 0;
1587        let mut partition_queue_count = 0;
1588
1589        loop {
1590            if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1591                match msg {
1592                    Ok(msg) => {
1593                        let _payload =
1594                            std::str::from_utf8(msg.payload().expect("missing payload"))?;
1595                        if partition_queue_count > 0 {
1596                            anyhow::bail!(
1597                                "Got message from common queue after we internally switched to partition queue."
1598                            );
1599                        }
1600
1601                        common_queue_count += 1;
1602                    }
1603                    Err(err) => anyhow::bail!("{}", err),
1604                }
1605            }
1606
1607            match partition_queue.poll(Duration::from_millis(0)) {
1608                Some(Ok(msg)) => {
1609                    let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1610                    partition_queue_count += 1;
1611                }
1612                Some(Err(err)) => anyhow::bail!("{}", err),
1613                _ => (),
1614            }
1615
1616            if (common_queue_count + partition_queue_count) == expected_messages {
1617                break;
1618            }
1619        }
1620
1621        assert!(
1622            common_queue_count == 0,
1623            "Got {} out of {} messages from common queue. Partition queue: {}",
1624            common_queue_count,
1625            expected_messages,
1626            partition_queue_count
1627        );
1628
1629        Ok(())
1630    }
1631}
1632
1633/// Fetches the list of partitions and their corresponding high watermark.
1634fn fetch_partition_info<C: ConsumerContext>(
1635    consumer: &BaseConsumer<C>,
1636    topic: &str,
1637    fetch_timeout: Duration,
1638    offset_requested: Offset,
1639) -> Result<BTreeMap<PartitionId, PartitionWatermark>, GetPartitionsError> {
1640    let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1641
1642    let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1643    for pid in pids {
1644        offset_requests.add_partition_offset(topic, pid, offset_requested)?;
1645    }
1646
1647    let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1648
1649    let mut result = BTreeMap::new();
1650    for entry in offset_responses.elements() {
1651        let offset = match entry.offset() {
1652            Offset::Offset(offset) => offset,
1653            offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1654        };
1655
1656        let pid = entry.partition();
1657        let watermark = offset.try_into().expect("invalid negative offset");
1658        result.insert(pid, watermark);
1659    }
1660
1661    Ok(result)
1662}
1663
1664/// An update produced by the metadata fetcher.
1665#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1666enum MetadataUpdate {
1667    /// The current IDs and high watermarks of all topic partitions.
1668    Partitions(BTreeMap<PartitionId, PartitionWatermark>),
1669    /// A transient error.
1670    ///
1671    /// Transient errors stall the source until their cause has been resolved.
1672    TransientError(HealthStatus),
1673    /// A definite error.
1674    ///
1675    /// Definite errors cannot be recovered from. They poison the source until the end of time.
1676    DefiniteError(SourceError),
1677}
1678
1679impl MetadataUpdate {
1680    /// Return the upstream frontier resulting from the metadata update, if any.
1681    fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1682        match self {
1683            Self::Partitions(partitions) => {
1684                let max_pid = partitions.keys().last().copied();
1685                let lower = max_pid
1686                    .map(RangeBound::after)
1687                    .unwrap_or(RangeBound::NegInfinity);
1688                let future_ts =
1689                    Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1690
1691                let mut frontier = Antichain::from_elem(future_ts);
1692                for (pid, high_watermark) in partitions {
1693                    frontier.insert(Partitioned::new_singleton(
1694                        RangeBound::exact(*pid),
1695                        MzOffset::from(*high_watermark),
1696                    ));
1697                }
1698
1699                Some(frontier)
1700            }
1701            Self::DefiniteError(_) => Some(Antichain::new()),
1702            Self::TransientError(_) => None,
1703        }
1704    }
1705}
1706
1707#[derive(Debug, thiserror::Error)]
1708pub enum KafkaHeaderParseError {
1709    #[error("A header with key '{key}' was not found in the message headers")]
1710    KeyNotFound { key: String },
1711    #[error(
1712        "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1713    )]
1714    Utf8Error { key: String, raw: Vec<u8> },
1715}
1716
1717/// Render the metadata fetcher of a Kafka source.
1718///
1719/// The metadata fetcher is a single-worker operator that is responsible for periodically fetching
1720/// the Kafka topic metadata (partition IDs and high watermarks) and making it available as a
1721/// Timely stream.
1722fn render_metadata_fetcher<'scope>(
1723    scope: Scope<'scope, KafkaTimestamp>,
1724    connection: KafkaSourceConnection,
1725    config: RawSourceCreationConfig,
1726) -> (
1727    StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
1728    StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
1729    PressOnDropButton,
1730) {
1731    let active_worker_id = usize::cast_from(config.id.hashed());
1732    let is_active_worker = active_worker_id % scope.peers() == scope.index();
1733
1734    let resume_upper = Antichain::from_iter(
1735        config
1736            .source_resume_uppers
1737            .values()
1738            .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1739            .flatten(),
1740    );
1741
1742    let name = format!("KafkaMetadataFetcher({})", config.id);
1743    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1744
1745    let (metadata_output, metadata_stream) =
1746        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1747    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1748
1749    let button = builder.build(move |caps| async move {
1750        if !is_active_worker {
1751            return;
1752        }
1753
1754        let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1755
1756        let client_id = connection.client_id(
1757            config.config.config_set(),
1758            &config.config.connection_context,
1759            config.id,
1760        );
1761        let KafkaSourceConnection {
1762            connection,
1763            topic,
1764            topic_metadata_refresh_interval,
1765            ..
1766        } = connection;
1767
1768        let consumer: Result<BaseConsumer<_>, _> = connection
1769            .create_with_context(
1770                &config.config,
1771                MzClientContext::default(),
1772                &btreemap! {
1773                    // Use the user-configured topic metadata refresh
1774                    // interval.
1775                    "topic.metadata.refresh.interval.ms" =>
1776                        topic_metadata_refresh_interval
1777                        .as_millis()
1778                        .to_string(),
1779                    // Allow Kafka monitoring tools to identify this
1780                    // consumer.
1781                    "client.id" => format!("{client_id}-metadata"),
1782                },
1783                InTask::Yes,
1784            )
1785            .await;
1786
1787        let consumer = match consumer {
1788            Ok(consumer) => consumer,
1789            Err(e) => {
1790                let msg = format!(
1791                    "failed creating kafka metadata consumer: {}",
1792                    e.display_with_causes()
1793                );
1794                let status_update = HealthStatusUpdate::halting(msg, None);
1795                let status = match e {
1796                    ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1797                    _ => HealthStatus::kafka(status_update),
1798                };
1799                let error = MetadataUpdate::TransientError(status);
1800                let timestamp = (config.now_fn)().into();
1801                metadata_output.give(&metadata_cap, (timestamp, error));
1802
1803                // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
1804                // Returning would incorrectly present to the remap operator as progress to the
1805                // empty frontier which would be incorrectly recorded to the remap shard.
1806                std::future::pending::<()>().await;
1807                unreachable!("pending future never returns");
1808            }
1809        };
1810
1811        let (tx, mut rx) = mpsc::unbounded_channel();
1812        spawn_metadata_thread(config, consumer, topic, tx);
1813
1814        let mut prev_upstream_frontier = resume_upper;
1815
1816        while let Some((timestamp, mut update)) = rx.recv().await {
1817            if prev_upstream_frontier.is_empty() {
1818                return;
1819            }
1820
1821            if let Some(upstream_frontier) = update.upstream_frontier() {
1822                // Topics are identified by name but it's possible that a user recreates a topic
1823                // with the same name. Ideally we'd want to catch all of these cases and
1824                // immediately error out the source, since the data is effectively gone.
1825                // Unfortunately this is not possible without something like KIP-516.
1826                //
1827                // The best we can do is check whether the upstream frontier regressed. This tells
1828                // us that the topic was recreated and now contains fewer offsets and/or fewer
1829                // partitions. Note that we are not able to detect topic recreation if neither of
1830                // the two are true.
1831                if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1832                    let error = SourceError {
1833                        error: SourceErrorDetails::Other("topic was recreated".into()),
1834                    };
1835                    update = MetadataUpdate::DefiniteError(error);
1836                }
1837            }
1838
1839            if let Some(upstream_frontier) = update.upstream_frontier() {
1840                prev_upstream_frontier = upstream_frontier.clone();
1841
1842                let probe = Probe {
1843                    probe_ts: timestamp,
1844                    upstream_frontier,
1845                };
1846                probe_output.give(&probe_cap, probe);
1847            }
1848
1849            metadata_output.give(&metadata_cap, (timestamp, update));
1850        }
1851    });
1852
1853    (metadata_stream, probe_stream, button.press_on_drop())
1854}
1855
1856fn spawn_metadata_thread<C: ConsumerContext>(
1857    config: RawSourceCreationConfig,
1858    consumer: BaseConsumer<TunnelingClientContext<C>>,
1859    topic: String,
1860    tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1861) {
1862    // Linux thread names are limited to 15 characters. Use a truncated ID to fit the name.
1863    thread::Builder::new()
1864        .name(format!("kfk-mtdt-{}", config.id))
1865        .spawn(move || {
1866            trace!(
1867                source_id = config.id.to_string(),
1868                worker_id = config.worker_id,
1869                num_workers = config.worker_count,
1870                "kafka metadata thread: starting..."
1871            );
1872
1873            let timestamp_interval = config.timestamp_interval;
1874            let mut ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
1875
1876            loop {
1877                let probe_ts = ticker.tick_blocking();
1878                let result = fetch_partition_info(
1879                    &consumer,
1880                    &topic,
1881                    config
1882                        .config
1883                        .parameters
1884                        .kafka_timeout_config
1885                        .fetch_metadata_timeout,
1886                    Offset::End,
1887                );
1888                trace!(
1889                    source_id = config.id.to_string(),
1890                    worker_id = config.worker_id,
1891                    num_workers = config.worker_count,
1892                    "kafka metadata thread: metadata fetch result: {:?}",
1893                    result
1894                );
1895                let update = match result {
1896                    Ok(partitions) => {
1897                        trace!(
1898                            source_id = config.id.to_string(),
1899                            worker_id = config.worker_id,
1900                            num_workers = config.worker_count,
1901                            "kafka metadata thread: fetched partition metadata info",
1902                        );
1903
1904                        MetadataUpdate::Partitions(partitions)
1905                    }
1906                    Err(GetPartitionsError::TopicDoesNotExist) => {
1907                        let error = SourceError {
1908                            error: SourceErrorDetails::Other("topic was deleted".into()),
1909                        };
1910                        MetadataUpdate::DefiniteError(error)
1911                    }
1912                    Err(e) => {
1913                        let kafka_status = Some(HealthStatusUpdate::stalled(
1914                            format!("{}", e.display_with_causes()),
1915                            None,
1916                        ));
1917
1918                        let ssh_status = consumer.client().context().tunnel_status();
1919                        let ssh_status = match ssh_status {
1920                            SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1921                            SshTunnelStatus::Errored(e) => {
1922                                Some(HealthStatusUpdate::stalled(e, None))
1923                            }
1924                        };
1925
1926                        MetadataUpdate::TransientError(HealthStatus {
1927                            kafka: kafka_status,
1928                            ssh: ssh_status,
1929                        })
1930                    }
1931                };
1932
1933                if tx.send((probe_ts, update)).is_err() {
1934                    break;
1935                }
1936            }
1937
1938            info!(
1939                source_id = config.id.to_string(),
1940                worker_id = config.worker_id,
1941                num_workers = config.worker_count,
1942                "kafka metadata thread: receiver has gone away; shutting down."
1943            )
1944        })
1945        .unwrap();
1946}