Skip to main content

mz_storage/source/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::convert::Infallible;
13use std::str::{self};
14use std::sync::Arc;
15use std::thread;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use chrono::{DateTime, NaiveDateTime};
20use differential_dataflow::{AsCollection, Hashable};
21use futures::StreamExt;
22use itertools::Itertools;
23use maplit::btreemap;
24use mz_kafka_util::client::{
25    GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
26};
27use mz_ore::assert_none;
28use mz_ore::cast::CastFrom;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
34use mz_ssh_util::tunnel::SshTunnelStatus;
35use mz_storage_types::dyncfgs::KAFKA_METADATA_FETCH_INTERVAL;
36use mz_storage_types::errors::{
37    ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
38};
39use mz_storage_types::sources::kafka::{
40    KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
41};
42use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
43use mz_timely_util::antichain::AntichainExt;
44use mz_timely_util::builder_async::{
45    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::containers::stack::AccountedStackBuilder;
48use mz_timely_util::order::Partitioned;
49use rdkafka::consumer::base_consumer::PartitionQueue;
50use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
51use rdkafka::error::KafkaError;
52use rdkafka::message::{BorrowedMessage, Headers};
53use rdkafka::statistics::Statistics;
54use rdkafka::topic_partition_list::Offset;
55use rdkafka::{ClientContext, Message, TopicPartitionList};
56use serde::{Deserialize, Serialize};
57use timely::PartialOrder;
58use timely::container::CapacityContainerBuilder;
59use timely::dataflow::channels::pact::Pipeline;
60use timely::dataflow::operators::core::Partition;
61use timely::dataflow::operators::{Broadcast, Capability};
62use timely::dataflow::{Scope, Stream};
63use timely::progress::Antichain;
64use timely::progress::Timestamp;
65use tokio::sync::{Notify, mpsc};
66use tracing::{error, info, trace};
67
68use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
69use crate::metrics::source::kafka::KafkaSourceMetrics;
70use crate::source::types::{Probe, SignaledFuture, SourceRender, StackedCollection};
71use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
72use crate::statistics::SourceStatistics;
73
74#[derive(
75    Clone,
76    Debug,
77    Default,
78    PartialEq,
79    Eq,
80    PartialOrd,
81    Ord,
82    Serialize,
83    Deserialize
84)]
85struct HealthStatus {
86    kafka: Option<HealthStatusUpdate>,
87    ssh: Option<HealthStatusUpdate>,
88}
89
90impl HealthStatus {
91    fn kafka(update: HealthStatusUpdate) -> Self {
92        Self {
93            kafka: Some(update),
94            ssh: None,
95        }
96    }
97
98    fn ssh(update: HealthStatusUpdate) -> Self {
99        Self {
100            kafka: None,
101            ssh: Some(update),
102        }
103    }
104}
105
106/// Contains all information necessary to ingest data from Kafka
107pub struct KafkaSourceReader {
108    /// Name of the topic on which this source is backed on
109    topic_name: String,
110    /// Name of the source (will have format kafka-source-id)
111    source_name: String,
112    /// Source global ID
113    id: GlobalId,
114    /// Kafka consumer for this source
115    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
116    /// List of consumers. A consumer should be assigned per partition to guarantee fairness
117    partition_consumers: Vec<PartitionConsumer>,
118    /// Worker ID
119    worker_id: usize,
120    /// Total count of workers
121    worker_count: usize,
122    /// The most recently read offset for each partition known to this source
123    /// reader by output-index. An offset of -1 indicates that no prior message
124    /// has been read for the given partition.
125    last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
126    /// The offset to start reading from for each partition.
127    start_offsets: BTreeMap<PartitionId, i64>,
128    /// Channel to receive Kafka statistics JSON blobs from the stats callback.
129    stats_rx: crossbeam_channel::Receiver<Jsonb>,
130    /// A handle to the partition specific metrics
131    partition_metrics: KafkaSourceMetrics,
132    /// Per partition capabilities used to produce messages
133    partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
134}
135
136struct PartitionCapability {
137    /// The capability of the data produced
138    data: Capability<KafkaTimestamp>,
139    /// The capability of the progress stream
140    progress: Capability<KafkaTimestamp>,
141}
142
143/// The high watermark offsets of a Kafka partition.
144///
145/// This is the offset of the latest message in the topic/partition available for consumption + 1.
146type HighWatermark = u64;
147
148/// Processes `resume_uppers` stream updates, committing them upstream and
149/// storing them in the `progress_statistics` to be emitted later.
150pub struct KafkaResumeUpperProcessor {
151    config: RawSourceCreationConfig,
152    topic_name: String,
153    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
154    statistics: Vec<SourceStatistics>,
155}
156
157/// Computes whether this worker is responsible for consuming a partition. It assigns partitions to
158/// workers in a round-robin fashion, starting at an arbitrary worker based on the hash of the
159/// source id.
160fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
161    let pid = usize::try_from(pid).expect("positive pid");
162    ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
163}
164
165struct SourceOutputInfo {
166    id: GlobalId,
167    output_index: usize,
168    resume_upper: Antichain<KafkaTimestamp>,
169    metadata_columns: Vec<KafkaMetadataKind>,
170}
171
172impl SourceRender for KafkaSourceConnection {
173    // TODO(petrosagg): The type used for the partition (RangeBound<PartitionId>) doesn't need to
174    // be so complicated and we could instead use `Partitioned<PartitionId, Option<u64>>` where all
175    // ranges are inclusive and a time of `None` signifies that a particular partition is not
176    // present. This requires an shard migration of the remap shard.
177    type Time = KafkaTimestamp;
178
179    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
180
181    fn render<G: Scope<Timestamp = KafkaTimestamp>>(
182        self,
183        scope: &mut G,
184        config: &RawSourceCreationConfig,
185        resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
186        start_signal: impl std::future::Future<Output = ()> + 'static,
187    ) -> (
188        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
189        Stream<G, Infallible>,
190        Stream<G, HealthStatusMessage>,
191        Option<Stream<G, Probe<KafkaTimestamp>>>,
192        Vec<PressOnDropButton>,
193    ) {
194        let (metadata, probes, metadata_token) =
195            render_metadata_fetcher(scope, self.clone(), config.clone());
196        let (data, progress, health, reader_token) = render_reader(
197            scope,
198            self,
199            config.clone(),
200            resume_uppers,
201            metadata,
202            start_signal,
203        );
204
205        let partition_count = u64::cast_from(config.source_exports.len());
206        let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
207            partition_count,
208            |((output, data), time, diff): &(
209                (usize, Result<SourceMessage, DataflowError>),
210                _,
211                Diff,
212            )| {
213                let output = u64::cast_from(*output);
214                (output, (data.clone(), time.clone(), diff.clone()))
215            },
216        );
217        let mut data_collections = BTreeMap::new();
218        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
219            data_collections.insert(*id, data_stream.as_collection());
220        }
221
222        (
223            data_collections,
224            progress,
225            health,
226            Some(probes),
227            vec![metadata_token, reader_token],
228        )
229    }
230}
231
232/// Render the reader of a Kafka source.
233///
234/// The reader is responsible for polling the Kafka topic partitions for new messages, and
235/// transforming them into a `SourceMessage` collection.
236fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
237    scope: &G,
238    connection: KafkaSourceConnection,
239    config: RawSourceCreationConfig,
240    resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
241    metadata_stream: Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
242    start_signal: impl std::future::Future<Output = ()> + 'static,
243) -> (
244    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
245    Stream<G, Infallible>,
246    Stream<G, HealthStatusMessage>,
247    PressOnDropButton,
248) {
249    let name = format!("KafkaReader({})", config.id);
250    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
251
252    let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
253    let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
254    let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
255
256    let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline);
257
258    let mut outputs = vec![];
259
260    // Contains the `SourceStatistics` entries for exports that require a snapshot.
261    let mut all_export_stats = vec![];
262    let mut snapshot_export_stats = vec![];
263    for (idx, (id, export)) in config.source_exports.iter().enumerate() {
264        let SourceExport {
265            details,
266            storage_metadata: _,
267            data_config: _,
268        } = export;
269        let resume_upper = Antichain::from_iter(
270            config
271                .source_resume_uppers
272                .get(id)
273                .expect("all source exports must be present in source resume uppers")
274                .iter()
275                .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
276        );
277
278        let metadata_columns = match details {
279            SourceExportDetails::Kafka(details) => details
280                .metadata_columns
281                .iter()
282                .map(|(_name, kind)| kind.clone())
283                .collect::<Vec<_>>(),
284            _ => panic!("unexpected source export details: {:?}", details),
285        };
286
287        let statistics = config
288            .statistics
289            .get(id)
290            .expect("statistics have been initialized")
291            .clone();
292        // export requires snapshot
293        if resume_upper.as_ref() == &[Partitioned::minimum()] {
294            snapshot_export_stats.push(statistics.clone());
295        }
296        all_export_stats.push(statistics);
297
298        let output = SourceOutputInfo {
299            id: *id,
300            resume_upper,
301            output_index: idx,
302            metadata_columns,
303        };
304        outputs.push(output);
305    }
306
307    let busy_signal = Arc::clone(&config.busy_signal);
308    let button = builder.build(move |caps| {
309        SignaledFuture::new(busy_signal, async move {
310            let [mut data_cap, mut progress_cap, health_cap] = caps.try_into().unwrap();
311
312            let client_id = connection.client_id(
313                config.config.config_set(),
314                &config.config.connection_context,
315                config.id,
316            );
317            let group_id = connection.group_id(&config.config.connection_context, config.id);
318            let KafkaSourceConnection {
319                connection,
320                topic,
321                topic_metadata_refresh_interval,
322                start_offsets,
323                metadata_columns: _,
324                // Exhaustive match protects against forgetting to apply an
325                // option. Ignored fields are justified below.
326                connection_id: _,   // not needed here
327                group_id_prefix: _, // used above via `connection.group_id`
328            } = connection;
329
330            // Start offsets is a map from partition to the next offset to read from.
331            let mut start_offsets: BTreeMap<_, i64> = start_offsets
332                .clone()
333                .into_iter()
334                .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
335                .map(|(k, v)| (k, v))
336                .collect();
337
338            let mut partition_capabilities = BTreeMap::new();
339            let mut max_pid = None;
340            let resume_upper = Antichain::from_iter(
341                outputs
342                    .iter()
343                    .map(|output| output.resume_upper.clone())
344                    .flatten(),
345            );
346
347            for ts in resume_upper.elements() {
348                if let Some(pid) = ts.interval().singleton() {
349                    let pid = pid.unwrap_exact();
350                    max_pid = std::cmp::max(max_pid, Some(*pid));
351                    if responsible_for_pid(&config, *pid) {
352                        let restored_offset = i64::try_from(ts.timestamp().offset)
353                            .expect("restored kafka offsets must fit into i64");
354                        if let Some(start_offset) = start_offsets.get_mut(pid) {
355                            *start_offset = std::cmp::max(restored_offset, *start_offset);
356                        } else {
357                            start_offsets.insert(*pid, restored_offset);
358                        }
359
360                        let part_ts = Partitioned::new_singleton(
361                            RangeBound::exact(*pid),
362                            ts.timestamp().clone(),
363                        );
364                        let part_cap = PartitionCapability {
365                            data: data_cap.delayed(&part_ts),
366                            progress: progress_cap.delayed(&part_ts),
367                        };
368                        partition_capabilities.insert(*pid, part_cap);
369                    }
370                }
371            }
372            let lower = max_pid
373                .map(RangeBound::after)
374                .unwrap_or(RangeBound::NegInfinity);
375            let future_ts =
376                Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
377            data_cap.downgrade(&future_ts);
378            progress_cap.downgrade(&future_ts);
379
380            info!(
381                source_id = config.id.to_string(),
382                worker_id = config.worker_id,
383                num_workers = config.worker_count,
384                "instantiating Kafka source reader at offsets {start_offsets:?}"
385            );
386
387            let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
388            let notificator = Arc::new(Notify::new());
389
390            let consumer: Result<BaseConsumer<_>, _> = connection
391                .create_with_context(
392                    &config.config,
393                    GlueConsumerContext {
394                        notificator: Arc::clone(&notificator),
395                        stats_tx,
396                        inner: MzClientContext::default(),
397                    },
398                    &btreemap! {
399                        // Disable Kafka auto commit. We manually commit offsets
400                        // to Kafka once we have reclocked those offsets, so
401                        // that users can use standard Kafka tools for progress
402                        // tracking.
403                        "enable.auto.commit" => "false".into(),
404                        // Always begin ingest at 0 when restarted, even if Kafka
405                        // contains committed consumer read offsets
406                        "auto.offset.reset" => "earliest".into(),
407                        // Use the user-configured topic metadata refresh
408                        // interval.
409                        "topic.metadata.refresh.interval.ms" =>
410                            topic_metadata_refresh_interval
411                            .as_millis()
412                            .to_string(),
413                        // TODO: document the rationale for this.
414                        "fetch.message.max.bytes" => "134217728".into(),
415                        // Consumer group ID, which may have been overridden by
416                        // the user. librdkafka requires this, and we use offset
417                        // committing to provide a way for users to monitor
418                        // ingest progress, though we do not rely on the
419                        // committed offsets for any functionality.
420                        "group.id" => group_id.clone(),
421                        // Allow Kafka monitoring tools to identify this
422                        // consumer.
423                        "client.id" => client_id.clone(),
424                    },
425                    InTask::Yes,
426                )
427                .await;
428
429            let consumer = match consumer {
430                Ok(consumer) => Arc::new(consumer),
431                Err(e) => {
432                    let update = HealthStatusUpdate::halting(
433                        format!(
434                            "failed creating kafka reader consumer: {}",
435                            e.display_with_causes()
436                        ),
437                        None,
438                    );
439                    health_output.give(
440                        &health_cap,
441                        HealthStatusMessage {
442                            id: None,
443                            namespace: if matches!(e, ContextCreationError::Ssh(_)) {
444                                StatusNamespace::Ssh
445                            } else {
446                                StatusNamespace::Kafka
447                            },
448                            update: update.clone(),
449                        },
450                    );
451                    for (output, update) in outputs.iter().repeat_clone(update) {
452                        health_output.give(
453                            &health_cap,
454                            HealthStatusMessage {
455                                id: Some(output.id),
456                                namespace: if matches!(e, ContextCreationError::Ssh(_)) {
457                                    StatusNamespace::Ssh
458                                } else {
459                                    StatusNamespace::Kafka
460                                },
461                                update,
462                            },
463                        );
464                    }
465                    // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
466                    // Returning would incorrectly present to the remap operator as progress to the
467                    // empty frontier which would be incorrectly recorded to the remap shard.
468                    std::future::pending::<()>().await;
469                    unreachable!("pending future never returns");
470                }
471            };
472
473            // Note that we wait for this AFTER we downgrade to the source `resume_upper`. This
474            // allows downstream operators (namely, the `reclock_operator`) to downgrade to the
475            // `resume_upper`, which is necessary for this basic form of backpressure to work.
476            start_signal.await;
477            info!(
478                source_id = config.id.to_string(),
479                worker_id = config.worker_id,
480                num_workers = config.worker_count,
481                "kafka worker noticed rehydration is finished, starting partition queues..."
482            );
483
484            let partition_ids = start_offsets.keys().copied().collect();
485            let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
486
487            let mut reader = KafkaSourceReader {
488                topic_name: topic.clone(),
489                source_name: config.name.clone(),
490                id: config.id,
491                partition_consumers: Vec::new(),
492                consumer: Arc::clone(&consumer),
493                worker_id: config.worker_id,
494                worker_count: config.worker_count,
495                last_offsets: outputs
496                    .iter()
497                    .map(|output| (output.output_index, BTreeMap::new()))
498                    .collect(),
499                start_offsets,
500                stats_rx,
501                partition_metrics: config.metrics.get_kafka_source_metrics(
502                    partition_ids,
503                    topic.clone(),
504                    config.id,
505                ),
506                partition_capabilities,
507            };
508
509            let offset_committer = KafkaResumeUpperProcessor {
510                config: config.clone(),
511                topic_name: topic.clone(),
512                consumer,
513                statistics: all_export_stats.clone(),
514            };
515
516            // Seed the progress metrics with `0` if we are snapshotting.
517            if !snapshot_export_stats.is_empty() {
518                if let Err(e) = offset_committer
519                    .process_frontier(resume_upper.clone())
520                    .await
521                {
522                    offset_commit_metrics.offset_commit_failures.inc();
523                    tracing::warn!(
524                        %e,
525                        "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
526                        worker_id = config.worker_id,
527                        source_id = config.id,
528                        upper = resume_upper.pretty()
529                    );
530                }
531                // Reset snapshot statistics for any exports that are not involved
532                // in this round of snapshotting. Those that are snapshotting this round will
533                // see updates as the snapshot commences.
534                for statistics in config.statistics.values() {
535                    statistics.set_snapshot_records_known(0);
536                    statistics.set_snapshot_records_staged(0);
537                }
538            }
539
540            let resume_uppers_process_loop = async move {
541                tokio::pin!(resume_uppers);
542                while let Some(frontier) = resume_uppers.next().await {
543                    if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
544                        offset_commit_metrics.offset_commit_failures.inc();
545                        tracing::warn!(
546                            %e,
547                            "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
548                            worker_id = config.worker_id,
549                            source_id = config.id,
550                            upper = frontier.pretty()
551                        );
552                    }
553                }
554                // During dataflow shutdown this loop can end due to the general chaos caused by
555                // dropping tokens as a means to shutdown. This call ensures this future never ends
556                // and we instead rely on this operator being dropped altogether when *its* token
557                // is dropped.
558                std::future::pending::<()>().await;
559            };
560            tokio::pin!(resume_uppers_process_loop);
561
562            let mut metadata_update: Option<MetadataUpdate> = None;
563            let mut snapshot_total = None;
564
565            let max_wait_time =
566                mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
567            loop {
568                // Wait for data or metadata events while also making progress with offset
569                // committing.
570                tokio::select! {
571                    // TODO(petrosagg): remove the timeout and rely purely on librdkafka waking us
572                    // up
573                    _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
574
575                    _ = metadata_input.ready() => {
576                        // Collect all pending updates, then only keep the most recent one.
577                        let mut updates = Vec::new();
578                        while let Some(event) = metadata_input.next_sync() {
579                            if let Event::Data(_, mut data) = event {
580                                updates.append(&mut data);
581                            }
582                        }
583                        metadata_update = updates
584                            .into_iter()
585                            .max_by_key(|(ts, _)| *ts)
586                            .map(|(_, update)| update);
587                    }
588
589                    // This future is not cancel safe but we are only passing a reference to it in
590                    // the select! loop so the future stays on the stack and never gets cancelled
591                    // until the end of the function.
592                    _ = resume_uppers_process_loop.as_mut() => {},
593                }
594
595                match metadata_update.take() {
596                    Some(MetadataUpdate::Partitions(partitions)) => {
597                        let max_pid = partitions.keys().last().cloned();
598                        let lower = max_pid
599                            .map(RangeBound::after)
600                            .unwrap_or(RangeBound::NegInfinity);
601                        let future_ts = Partitioned::new_range(
602                            lower,
603                            RangeBound::PosInfinity,
604                            MzOffset::from(0),
605                        );
606
607                        let mut offset_known = 0;
608                        for (&pid, &high_watermark) in &partitions {
609                            if responsible_for_pid(&config, pid) {
610                                offset_known += high_watermark;
611                                reader.ensure_partition(pid);
612                                if let Entry::Vacant(entry) =
613                                    reader.partition_capabilities.entry(pid)
614                                {
615                                    let start_offset = match reader.start_offsets.get(&pid) {
616                                        Some(&offset) => offset.try_into().unwrap(),
617                                        None => 0u64,
618                                    };
619                                    let part_since_ts = Partitioned::new_singleton(
620                                        RangeBound::exact(pid),
621                                        MzOffset::from(start_offset),
622                                    );
623                                    let part_upper_ts = Partitioned::new_singleton(
624                                        RangeBound::exact(pid),
625                                        MzOffset::from(high_watermark),
626                                    );
627
628                                    // This is the moment at which we have discovered a new partition
629                                    // and we need to make sure we produce its initial snapshot at a,
630                                    // single timestamp so that the source transitions from no data
631                                    // from this partition to all the data of this partition. We do
632                                    // this by initializing the data capability to the starting offset
633                                    // and, importantly, the progress capability directly to the high
634                                    // watermark. This jump of the progress capability ensures that
635                                    // everything until the high watermark will be reclocked to a
636                                    // single point.
637                                    entry.insert(PartitionCapability {
638                                        data: data_cap.delayed(&part_since_ts),
639                                        progress: progress_cap.delayed(&part_upper_ts),
640                                    });
641                                }
642                            }
643                        }
644
645                        // If we are snapshotting, record our first set of partitions as the snapshot
646                        // size.
647                        if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
648                            // Note that we want to represent the _number of offsets_, which
649                            // means the watermark's frontier semantics is correct, without
650                            // subtracting (Kafka offsets start at 0).
651                            snapshot_total = Some(offset_known);
652                        }
653
654                        // Clear all the health namespaces we know about.
655                        // Note that many kafka sources's don't have an ssh tunnel, but the
656                        // `health_operator` handles this fine.
657                        for output in &outputs {
658                            for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
659                                health_output.give(
660                                    &health_cap,
661                                    HealthStatusMessage {
662                                        id: Some(output.id),
663                                        namespace,
664                                        update: HealthStatusUpdate::running(),
665                                    },
666                                );
667                            }
668                        }
669                        for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
670                            health_output.give(
671                                &health_cap,
672                                HealthStatusMessage {
673                                    id: None,
674                                    namespace,
675                                    update: HealthStatusUpdate::running(),
676                                },
677                            );
678                        }
679
680                        for export_stat in all_export_stats.iter() {
681                            export_stat.set_offset_known(offset_known);
682                        }
683
684                        data_cap.downgrade(&future_ts);
685                        progress_cap.downgrade(&future_ts);
686                    }
687                    Some(MetadataUpdate::TransientError(status)) => {
688                        if let Some(update) = status.kafka {
689                            health_output.give(
690                                &health_cap,
691                                HealthStatusMessage {
692                                    id: None,
693                                    namespace: StatusNamespace::Kafka,
694                                    update: update.clone(),
695                                },
696                            );
697                            for (output, update) in outputs.iter().repeat_clone(update) {
698                                health_output.give(
699                                    &health_cap,
700                                    HealthStatusMessage {
701                                        id: Some(output.id),
702                                        namespace: StatusNamespace::Kafka,
703                                        update,
704                                    },
705                                );
706                            }
707                        }
708                        if let Some(update) = status.ssh {
709                            health_output.give(
710                                &health_cap,
711                                HealthStatusMessage {
712                                    id: None,
713                                    namespace: StatusNamespace::Ssh,
714                                    update: update.clone(),
715                                },
716                            );
717                            for (output, update) in outputs.iter().repeat_clone(update) {
718                                health_output.give(
719                                    &health_cap,
720                                    HealthStatusMessage {
721                                        id: Some(output.id),
722                                        namespace: StatusNamespace::Ssh,
723                                        update,
724                                    },
725                                );
726                            }
727                        }
728                    }
729                    Some(MetadataUpdate::DefiniteError(error)) => {
730                        health_output.give(
731                            &health_cap,
732                            HealthStatusMessage {
733                                id: None,
734                                namespace: StatusNamespace::Kafka,
735                                update: HealthStatusUpdate::stalled(
736                                    error.to_string(),
737                                    None,
738                                ),
739                            },
740                        );
741                        let error = Err(error.into());
742                        let time = data_cap.time().clone();
743                        for (output, error) in
744                            outputs.iter().map(|o| o.output_index).repeat_clone(error)
745                        {
746                            data_output
747                                .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
748                                .await;
749                        }
750
751                        return;
752                    }
753                    None => {}
754                }
755
756                // Poll the consumer once. We split the consumer's partitions out into separate
757                // queues and poll those individually, but it's still necessary to drive logic that
758                // consumes from rdkafka's internal event queue, such as statistics callbacks.
759                //
760                // Additionally, assigning topics and splitting them off into separate queues is
761                // not atomic, so we expect to see at least some messages to show up when polling
762                // the consumer directly.
763                while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
764                    match result {
765                        Err(e) => {
766                            let error = format!(
767                                "kafka error when polling consumer for source: {} topic: {} : {}",
768                                reader.source_name, reader.topic_name, e
769                            );
770                            let status = HealthStatusUpdate::stalled(error, None);
771                            health_output.give(
772                                &health_cap,
773                                HealthStatusMessage {
774                                    id: None,
775                                    namespace: StatusNamespace::Kafka,
776                                    update: status.clone(),
777                                },
778                            );
779                            for (output, status) in outputs.iter().repeat_clone(status) {
780                                health_output.give(
781                                    &health_cap,
782                                    HealthStatusMessage {
783                                        id: Some(output.id),
784                                        namespace: StatusNamespace::Kafka,
785                                        update: status,
786                                    },
787                                );
788                            }
789                        }
790                        Ok(message) => {
791                            let output_messages = outputs
792                                .iter()
793                                .map(|output| {
794                                    let (message, ts) = construct_source_message(
795                                        &message,
796                                        &output.metadata_columns,
797                                    );
798                                    (output.output_index, message, ts)
799                                })
800                                // This vec allocation is required to allow obtaining a `&mut`
801                                // on `reader` for the `reader.handle_message` call in the
802                                // loop below since  `message` is borrowed from `reader`.
803                                .collect::<Vec<_>>();
804                            for (output_index, message, ts) in output_messages {
805                                if let Some((msg, time, diff)) =
806                                    reader.handle_message(message, ts, &output_index)
807                                {
808                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
809                                    let part_cap = &reader.partition_capabilities[pid].data;
810                                    let msg = msg.map_err(|e| {
811                                        DataflowError::SourceError(Box::new(SourceError {
812                                            error: SourceErrorDetails::Other(e.to_string().into()),
813                                        }))
814                                    });
815                                    data_output
816                                        .give_fueled(part_cap, ((output_index, msg), time, diff))
817                                        .await;
818                                }
819                            }
820                        }
821                    }
822                }
823
824                reader.update_stats();
825
826                // Take the consumers temporarily to get around borrow checker errors
827                let mut consumers = std::mem::take(&mut reader.partition_consumers);
828                for consumer in consumers.iter_mut() {
829                    let pid = consumer.pid();
830                    // We want to make sure the rest of the actions in the outer loops get
831                    // a chance to run. If rdkafka keeps pumping data at us we might find
832                    // ourselves in a situation where we keep dumping data into the
833                    // dataflow without signaling progress. For this reason we consume at most
834                    // 10k messages from each partition and go around the loop.
835                    let mut partition_exhausted = false;
836                    for _ in 0..10_000 {
837                        let Some(message) = consumer.get_next_message().transpose() else {
838                            partition_exhausted = true;
839                            break;
840                        };
841
842                        for output in outputs.iter() {
843                            let message = match &message {
844                                Ok((msg, pid)) => {
845                                    let (msg, ts) =
846                                        construct_source_message(msg, &output.metadata_columns);
847                                    assert_eq!(*pid, ts.0);
848                                    Ok(reader.handle_message(msg, ts, &output.output_index))
849                                }
850                                Err(err) => Err(err),
851                            };
852                            match message {
853                                Ok(Some((msg, time, diff))) => {
854                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
855                                    let part_cap = &reader.partition_capabilities[pid].data;
856                                    let msg = msg.map_err(|e| {
857                                        DataflowError::SourceError(Box::new(SourceError {
858                                            error: SourceErrorDetails::Other(e.to_string().into()),
859                                        }))
860                                    });
861                                    data_output
862                                        .give_fueled(
863                                            part_cap,
864                                            ((output.output_index, msg), time, diff),
865                                        )
866                                        .await;
867                                }
868                                // The message was from an offset we've already seen.
869                                Ok(None) => continue,
870                                Err(err) => {
871                                    let last_offset = reader
872                                        .last_offsets
873                                        .get(&output.output_index)
874                                        .expect("output known to be installed")
875                                        .get(&pid)
876                                        .expect("partition known to be installed");
877
878                                    let status = HealthStatusUpdate::stalled(
879                                        format!(
880                                            "error consuming from source: {} topic: {topic}:\
881                                             partition: {pid} last processed offset:\
882                                             {last_offset} : {err}",
883                                            config.name
884                                        ),
885                                        None,
886                                    );
887                                    health_output.give(
888                                        &health_cap,
889                                        HealthStatusMessage {
890                                            id: None,
891                                            namespace: StatusNamespace::Kafka,
892                                            update: status.clone(),
893                                        },
894                                    );
895                                    health_output.give(
896                                        &health_cap,
897                                        HealthStatusMessage {
898                                            id: Some(output.id),
899                                            namespace: StatusNamespace::Kafka,
900                                            update: status,
901                                        },
902                                    );
903                                }
904                            }
905                        }
906                    }
907                    if !partition_exhausted {
908                        notificator.notify_one();
909                    }
910                }
911                // We can now put them back
912                assert!(reader.partition_consumers.is_empty());
913                reader.partition_consumers = consumers;
914
915                let positions = reader.consumer.position().unwrap();
916                let topic_positions = positions.elements_for_topic(&reader.topic_name);
917                let mut snapshot_staged = 0;
918
919                for position in topic_positions {
920                    // The offset begins in the `Offset::Invalid` state in which case we simply
921                    // skip this partition.
922                    if let Offset::Offset(offset) = position.offset() {
923                        let pid = position.partition();
924                        let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
925                        let upper =
926                            Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
927
928                        let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
929                        match part_cap.data.try_downgrade(&upper) {
930                            Ok(()) => {
931                                if !snapshot_export_stats.is_empty() {
932                                    // The `.position()` of the consumer represents what offset we have
933                                    // read up to.
934                                    snapshot_staged += offset.try_into().unwrap_or(0u64);
935                                    // This will always be `Some` at this point.
936                                    if let Some(snapshot_total) = snapshot_total {
937                                        // We will eventually read past the snapshot total, so we need
938                                        // to bound it here.
939                                        snapshot_staged =
940                                            std::cmp::min(snapshot_staged, snapshot_total);
941                                    }
942                                }
943                            }
944                            Err(_) => {
945                                // If we can't downgrade, it means we have already seen this offset.
946                                // This is expected and we can safely ignore it.
947                                info!(
948                                    source_id = config.id.to_string(),
949                                    worker_id = config.worker_id,
950                                    num_workers = config.worker_count,
951                                    "kafka source frontier downgrade skipped due to already \
952                                     seen offset: {:?}",
953                                    upper
954                                );
955                            }
956                        };
957
958                        // We use try_downgrade here because during the initial snapshot phase the
959                        // data capability is not beyond the progress capability and therefore a
960                        // normal downgrade would panic. Once it catches up though the data
961                        // capbility is what's pushing the progress capability forward.
962                        let _ = part_cap.progress.try_downgrade(&upper);
963                    }
964                }
965
966                if let (Some(snapshot_total), true) =
967                    (snapshot_total, !snapshot_export_stats.is_empty())
968                {
969                    for export_stat in snapshot_export_stats.iter() {
970                        export_stat.set_snapshot_records_known(snapshot_total);
971                        export_stat.set_snapshot_records_staged(snapshot_staged);
972                    }
973                    if snapshot_total == snapshot_staged {
974                        snapshot_export_stats.clear();
975                    }
976                }
977            }
978        })
979    });
980
981    (
982        stream.as_collection(),
983        progress_stream,
984        health_stream,
985        button.press_on_drop(),
986    )
987}
988
989impl KafkaResumeUpperProcessor {
990    async fn process_frontier(
991        &self,
992        frontier: Antichain<KafkaTimestamp>,
993    ) -> Result<(), anyhow::Error> {
994        use rdkafka::consumer::CommitMode;
995
996        // Generate a list of partitions that this worker is responsible for
997        let mut offsets = vec![];
998        let mut offset_committed = 0;
999        for ts in frontier.iter() {
1000            if let Some(pid) = ts.interval().singleton() {
1001                let pid = pid.unwrap_exact();
1002                if responsible_for_pid(&self.config, *pid) {
1003                    offsets.push((pid.clone(), *ts.timestamp()));
1004
1005                    // Note that we do not subtract 1 from the frontier. Imagine
1006                    // that frontier is 2 for this pid. That means we have
1007                    // full processed offset 0 and offset 1, which means we have
1008                    // processed _2_ offsets.
1009                    offset_committed += ts.timestamp().offset;
1010                }
1011            }
1012        }
1013
1014        for export_stat in self.statistics.iter() {
1015            export_stat.set_offset_committed(offset_committed);
1016        }
1017
1018        if !offsets.is_empty() {
1019            let mut tpl = TopicPartitionList::new();
1020            for (pid, offset) in offsets {
1021                let offset_to_commit =
1022                    Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
1023                tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
1024                    .expect("offset known to be valid");
1025            }
1026            let consumer = Arc::clone(&self.consumer);
1027            mz_ore::task::spawn_blocking(
1028                || format!("source({}) kafka offset commit", self.config.id),
1029                move || consumer.commit(&tpl, CommitMode::Sync),
1030            )
1031            .await?;
1032        }
1033        Ok(())
1034    }
1035}
1036
1037impl KafkaSourceReader {
1038    /// Ensures that a partition queue for `pid` exists.
1039    fn ensure_partition(&mut self, pid: PartitionId) {
1040        if self.last_offsets.is_empty() {
1041            tracing::info!(
1042                source_id = %self.id,
1043                worker_id = %self.worker_id,
1044                "kafka source does not have any outputs, not creating partition queue");
1045
1046            return;
1047        }
1048        for last_offsets in self.last_offsets.values() {
1049            // early exit if we've already inserted this partition
1050            if last_offsets.contains_key(&pid) {
1051                return;
1052            }
1053        }
1054
1055        let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1056        self.create_partition_queue(pid, Offset::Offset(start_offset));
1057
1058        for last_offsets in self.last_offsets.values_mut() {
1059            let prev = last_offsets.insert(pid, start_offset - 1);
1060            assert_none!(prev);
1061        }
1062    }
1063
1064    /// Creates a new partition queue for `partition_id`.
1065    fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1066        info!(
1067            source_id = self.id.to_string(),
1068            worker_id = self.worker_id,
1069            num_workers = self.worker_count,
1070            "activating Kafka queue for topic {}, partition {}",
1071            self.topic_name,
1072            partition_id,
1073        );
1074
1075        // Collect old partition assignments
1076        let tpl = self.consumer.assignment().unwrap();
1077        // Create list from assignments
1078        let mut partition_list = TopicPartitionList::new();
1079        for partition in tpl.elements_for_topic(&self.topic_name) {
1080            partition_list
1081                .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1082                .expect("offset known to be valid");
1083        }
1084        // Add new partition
1085        partition_list
1086            .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1087            .expect("offset known to be valid");
1088        self.consumer
1089            .assign(&partition_list)
1090            .expect("assignment known to be valid");
1091
1092        // Since librdkafka v1.6.0, we need to recreate all partition queues
1093        // after every call to `self.consumer.assign`.
1094        let context = Arc::clone(self.consumer.context());
1095        for pc in &mut self.partition_consumers {
1096            pc.partition_queue = self
1097                .consumer
1098                .split_partition_queue(&self.topic_name, pc.pid)
1099                .expect("partition known to be valid");
1100            pc.partition_queue.set_nonempty_callback({
1101                let context = Arc::clone(&context);
1102                move || context.inner().activate()
1103            });
1104        }
1105
1106        let mut partition_queue = self
1107            .consumer
1108            .split_partition_queue(&self.topic_name, partition_id)
1109            .expect("partition known to be valid");
1110        partition_queue.set_nonempty_callback(move || context.inner().activate());
1111        self.partition_consumers
1112            .push(PartitionConsumer::new(partition_id, partition_queue));
1113        assert_eq!(
1114            self.consumer
1115                .assignment()
1116                .unwrap()
1117                .elements_for_topic(&self.topic_name)
1118                .len(),
1119            self.partition_consumers.len()
1120        );
1121    }
1122
1123    /// Read any statistics JSON blobs generated via the rdkafka statistics callback.
1124    fn update_stats(&mut self) {
1125        while let Ok(stats) = self.stats_rx.try_recv() {
1126            match serde_json::from_str::<Statistics>(&stats.to_string()) {
1127                Ok(statistics) => {
1128                    let topic = statistics.topics.get(&self.topic_name);
1129                    match topic {
1130                        Some(topic) => {
1131                            for (id, partition) in &topic.partitions {
1132                                self.partition_metrics
1133                                    .set_offset_max(*id, partition.hi_offset);
1134                            }
1135                        }
1136                        None => error!("No stats found for topic: {}", &self.topic_name),
1137                    }
1138                }
1139                Err(e) => {
1140                    error!("failed decoding librdkafka statistics JSON: {}", e);
1141                }
1142            }
1143        }
1144    }
1145
1146    /// Checks if the given message is viable for emission. This checks if the message offset is
1147    /// past the expected offset and returns None if it is not.
1148    fn handle_message(
1149        &mut self,
1150        message: Result<SourceMessage, KafkaHeaderParseError>,
1151        (partition, offset): (PartitionId, MzOffset),
1152        output_index: &usize,
1153    ) -> Option<(
1154        Result<SourceMessage, KafkaHeaderParseError>,
1155        KafkaTimestamp,
1156        Diff,
1157    )> {
1158        // Offsets are guaranteed to be 1) monotonically increasing *unless* there is
1159        // a network issue or a new partition added, at which point the consumer may
1160        // start processing the topic from the beginning, or we may see duplicate offsets
1161        // At all times, the guarantee : if we see offset x, we have seen all offsets [0,x-1]
1162        // that we are ever going to see holds.
1163        // Offsets are guaranteed to be contiguous when compaction is disabled. If compaction
1164        // is enabled, there may be gaps in the sequence.
1165        // If we see an "old" offset, we skip that message.
1166
1167        // Given the explicit consumer to partition assignment, we should never receive a message
1168        // for a partition for which we have no metadata
1169        assert!(
1170            self.last_offsets
1171                .get(output_index)
1172                .unwrap()
1173                .contains_key(&partition)
1174        );
1175
1176        let last_offset_ref = self
1177            .last_offsets
1178            .get_mut(output_index)
1179            .expect("output known to be installed")
1180            .get_mut(&partition)
1181            .expect("partition known to be installed");
1182
1183        let last_offset = *last_offset_ref;
1184        let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1185        if offset_as_i64 <= last_offset {
1186            info!(
1187                source_id = self.id.to_string(),
1188                worker_id = self.worker_id,
1189                num_workers = self.worker_count,
1190                "kafka message before expected offset: \
1191                 source {} (reading topic {}, partition {}, output {}) \
1192                 received offset {} expected offset {:?}",
1193                self.source_name,
1194                self.topic_name,
1195                partition,
1196                output_index,
1197                offset.offset,
1198                last_offset + 1,
1199            );
1200            // We explicitly should not consume the message as we have already processed it.
1201            None
1202        } else {
1203            *last_offset_ref = offset_as_i64;
1204
1205            let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1206            Some((message, ts, Diff::ONE))
1207        }
1208    }
1209}
1210
1211fn construct_source_message(
1212    msg: &BorrowedMessage<'_>,
1213    metadata_columns: &[KafkaMetadataKind],
1214) -> (
1215    Result<SourceMessage, KafkaHeaderParseError>,
1216    (PartitionId, MzOffset),
1217) {
1218    let pid = msg.partition();
1219    let Ok(offset) = u64::try_from(msg.offset()) else {
1220        panic!(
1221            "got negative offset ({}) from otherwise non-error'd kafka message",
1222            msg.offset()
1223        );
1224    };
1225
1226    let mut metadata = Row::default();
1227    let mut packer = metadata.packer();
1228    for kind in metadata_columns {
1229        match kind {
1230            KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1231            KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1232            KafkaMetadataKind::Timestamp => {
1233                let ts = msg
1234                    .timestamp()
1235                    .to_millis()
1236                    .expect("kafka sources always have upstream_time");
1237
1238                let d: Datum = DateTime::from_timestamp_millis(ts)
1239                    .and_then(|dt| {
1240                        let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1241                            dt.naive_utc().try_into().ok();
1242                        ct
1243                    })
1244                    .into();
1245                packer.push(d)
1246            }
1247            KafkaMetadataKind::Header { key, use_bytes } => {
1248                match msg.headers() {
1249                    Some(headers) => {
1250                        let d = headers
1251                            .iter()
1252                            .filter(|header| header.key == key)
1253                            .last()
1254                            .map(|header| match header.value {
1255                                Some(v) => {
1256                                    if *use_bytes {
1257                                        Ok(Datum::Bytes(v))
1258                                    } else {
1259                                        match str::from_utf8(v) {
1260                                            Ok(str) => Ok(Datum::String(str)),
1261                                            Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1262                                                key: key.clone(),
1263                                                raw: v.to_vec(),
1264                                            }),
1265                                        }
1266                                    }
1267                                }
1268                                None => Ok(Datum::Null),
1269                            })
1270                            .unwrap_or_else(|| {
1271                                Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1272                            });
1273                        match d {
1274                            Ok(d) => packer.push(d),
1275                            //abort with a definite error when the header is not found or cannot be parsed correctly
1276                            Err(err) => return (Err(err), (pid, offset.into())),
1277                        }
1278                    }
1279                    None => packer.push(Datum::Null),
1280                }
1281            }
1282            KafkaMetadataKind::Headers => {
1283                packer.push_list_with(|r| {
1284                    if let Some(headers) = msg.headers() {
1285                        for header in headers.iter() {
1286                            match header.value {
1287                                Some(v) => r.push_list_with(|record_row| {
1288                                    record_row.push(Datum::String(header.key));
1289                                    record_row.push(Datum::Bytes(v));
1290                                }),
1291                                None => r.push_list_with(|record_row| {
1292                                    record_row.push(Datum::String(header.key));
1293                                    record_row.push(Datum::Null);
1294                                }),
1295                            }
1296                        }
1297                    }
1298                });
1299            }
1300        }
1301    }
1302
1303    let key = match msg.key() {
1304        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1305        None => Row::pack([Datum::Null]),
1306    };
1307    let value = match msg.payload() {
1308        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1309        None => Row::pack([Datum::Null]),
1310    };
1311    (
1312        Ok(SourceMessage {
1313            key,
1314            value,
1315            metadata,
1316        }),
1317        (pid, offset.into()),
1318    )
1319}
1320
1321/// Wrapper around a partition containing the underlying consumer
1322struct PartitionConsumer {
1323    /// the partition id with which this consumer is associated
1324    pid: PartitionId,
1325    /// The underlying Kafka partition queue
1326    partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1327}
1328
1329impl PartitionConsumer {
1330    /// Creates a new partition consumer from underlying Kafka consumer
1331    fn new(
1332        pid: PartitionId,
1333        partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1334    ) -> Self {
1335        PartitionConsumer {
1336            pid,
1337            partition_queue,
1338        }
1339    }
1340
1341    /// Returns the next message to process for this partition (if any).
1342    ///
1343    /// The outer `Result` represents irrecoverable failures, the inner one can and will
1344    /// be transformed into empty values.
1345    ///
1346    /// The inner `Option` represents if there is a message to process.
1347    fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1348        match self.partition_queue.poll(Duration::from_millis(0)) {
1349            Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1350            Some(Err(err)) => Err(err),
1351            _ => Ok(None),
1352        }
1353    }
1354
1355    /// Return the partition id for this PartitionConsumer
1356    fn pid(&self) -> PartitionId {
1357        self.pid
1358    }
1359}
1360
1361/// An implementation of [`ConsumerContext`] that forwards statistics to the
1362/// worker
1363struct GlueConsumerContext {
1364    notificator: Arc<Notify>,
1365    stats_tx: crossbeam_channel::Sender<Jsonb>,
1366    inner: MzClientContext,
1367}
1368
1369impl ClientContext for GlueConsumerContext {
1370    fn stats_raw(&self, statistics: &[u8]) {
1371        match Jsonb::from_slice(statistics) {
1372            Ok(statistics) => {
1373                self.stats_tx
1374                    .send(statistics)
1375                    .expect("timely operator hung up while Kafka source active");
1376                self.activate();
1377            }
1378            Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1379        };
1380    }
1381
1382    // The shape of the rdkafka *Context traits require us to forward to the `MzClientContext`
1383    // implementation.
1384    fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1385        self.inner.log(level, fac, log_message)
1386    }
1387    fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1388        self.inner.error(error, reason)
1389    }
1390}
1391
1392impl GlueConsumerContext {
1393    fn activate(&self) {
1394        self.notificator.notify_one();
1395    }
1396}
1397
1398impl ConsumerContext for GlueConsumerContext {}
1399
1400#[cfg(test)]
1401mod tests {
1402    use std::sync::Arc;
1403    use std::time::Duration;
1404
1405    use mz_kafka_util::client::create_new_client_config_simple;
1406    use rdkafka::consumer::{BaseConsumer, Consumer};
1407    use rdkafka::{Message, Offset, TopicPartitionList};
1408    use uuid::Uuid;
1409
1410    // Splitting off a partition queue with an `Offset` that is not `Offset::Beginning` seems to
1411    // lead to a race condition where sometimes we receive messages from polling the main consumer
1412    // instead of on the partition queue. This can be surfaced by running the test in a loop (in
1413    // the dataflow directory) using:
1414    //
1415    // cargo stress --lib --release source::kafka::tests::reproduce_kafka_queue_issue
1416    //
1417    // cargo-stress can be installed via `cargo install cargo-stress`
1418    //
1419    // You need to set up a topic "queue-test" with 1000 "hello" messages in it. Obviously, running
1420    // this test requires a running Kafka instance at localhost:9092.
1421    #[mz_ore::test]
1422    #[ignore]
1423    fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1424        let topic_name = "queue-test";
1425        let pid = 0;
1426
1427        let mut kafka_config = create_new_client_config_simple();
1428        kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1429        kafka_config.set("enable.auto.commit", "false");
1430        kafka_config.set("group.id", Uuid::new_v4().to_string());
1431        kafka_config.set("fetch.message.max.bytes", "100");
1432        let consumer: BaseConsumer<_> = kafka_config.create()?;
1433
1434        let consumer = Arc::new(consumer);
1435
1436        let mut partition_list = TopicPartitionList::new();
1437        // Using Offset:Beginning here will work fine, only Offset:Offset(0) leads to the race
1438        // condition.
1439        partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1440
1441        consumer.assign(&partition_list)?;
1442
1443        let partition_queue = consumer
1444            .split_partition_queue(topic_name, pid)
1445            .expect("missing partition queue");
1446
1447        let expected_messages = 1_000;
1448
1449        let mut common_queue_count = 0;
1450        let mut partition_queue_count = 0;
1451
1452        loop {
1453            if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1454                match msg {
1455                    Ok(msg) => {
1456                        let _payload =
1457                            std::str::from_utf8(msg.payload().expect("missing payload"))?;
1458                        if partition_queue_count > 0 {
1459                            anyhow::bail!(
1460                                "Got message from common queue after we internally switched to partition queue."
1461                            );
1462                        }
1463
1464                        common_queue_count += 1;
1465                    }
1466                    Err(err) => anyhow::bail!("{}", err),
1467                }
1468            }
1469
1470            match partition_queue.poll(Duration::from_millis(0)) {
1471                Some(Ok(msg)) => {
1472                    let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1473                    partition_queue_count += 1;
1474                }
1475                Some(Err(err)) => anyhow::bail!("{}", err),
1476                _ => (),
1477            }
1478
1479            if (common_queue_count + partition_queue_count) == expected_messages {
1480                break;
1481            }
1482        }
1483
1484        assert!(
1485            common_queue_count == 0,
1486            "Got {} out of {} messages from common queue. Partition queue: {}",
1487            common_queue_count,
1488            expected_messages,
1489            partition_queue_count
1490        );
1491
1492        Ok(())
1493    }
1494}
1495
1496/// Fetches the list of partitions and their corresponding high watermark.
1497fn fetch_partition_info<C: ConsumerContext>(
1498    consumer: &BaseConsumer<C>,
1499    topic: &str,
1500    fetch_timeout: Duration,
1501) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1502    let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1503
1504    let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1505    for pid in pids {
1506        offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1507    }
1508
1509    let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1510
1511    let mut result = BTreeMap::new();
1512    for entry in offset_responses.elements() {
1513        let offset = match entry.offset() {
1514            Offset::Offset(offset) => offset,
1515            offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1516        };
1517
1518        let pid = entry.partition();
1519        let watermark = offset.try_into().expect("invalid negative offset");
1520        result.insert(pid, watermark);
1521    }
1522
1523    Ok(result)
1524}
1525
1526/// An update produced by the metadata fetcher.
1527#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1528enum MetadataUpdate {
1529    /// The current IDs and high watermarks of all topic partitions.
1530    Partitions(BTreeMap<PartitionId, HighWatermark>),
1531    /// A transient error.
1532    ///
1533    /// Transient errors stall the source until their cause has been resolved.
1534    TransientError(HealthStatus),
1535    /// A definite error.
1536    ///
1537    /// Definite errors cannot be recovered from. They poison the source until the end of time.
1538    DefiniteError(SourceError),
1539}
1540
1541impl MetadataUpdate {
1542    /// Return the upstream frontier resulting from the metadata update, if any.
1543    fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1544        match self {
1545            Self::Partitions(partitions) => {
1546                let max_pid = partitions.keys().last().copied();
1547                let lower = max_pid
1548                    .map(RangeBound::after)
1549                    .unwrap_or(RangeBound::NegInfinity);
1550                let future_ts =
1551                    Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1552
1553                let mut frontier = Antichain::from_elem(future_ts);
1554                for (pid, high_watermark) in partitions {
1555                    frontier.insert(Partitioned::new_singleton(
1556                        RangeBound::exact(*pid),
1557                        MzOffset::from(*high_watermark),
1558                    ));
1559                }
1560
1561                Some(frontier)
1562            }
1563            Self::DefiniteError(_) => Some(Antichain::new()),
1564            Self::TransientError(_) => None,
1565        }
1566    }
1567}
1568
1569#[derive(Debug, thiserror::Error)]
1570pub enum KafkaHeaderParseError {
1571    #[error("A header with key '{key}' was not found in the message headers")]
1572    KeyNotFound { key: String },
1573    #[error(
1574        "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1575    )]
1576    Utf8Error { key: String, raw: Vec<u8> },
1577}
1578
1579/// Render the metadata fetcher of a Kafka source.
1580///
1581/// The metadata fetcher is a single-worker operator that is responsible for periodically fetching
1582/// the Kafka topic metadata (partition IDs and high watermarks) and making it available as a
1583/// Timely stream.
1584fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
1585    scope: &G,
1586    connection: KafkaSourceConnection,
1587    config: RawSourceCreationConfig,
1588) -> (
1589    Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
1590    Stream<G, Probe<KafkaTimestamp>>,
1591    PressOnDropButton,
1592) {
1593    let active_worker_id = usize::cast_from(config.id.hashed());
1594    let is_active_worker = active_worker_id % scope.peers() == scope.index();
1595
1596    let resume_upper = Antichain::from_iter(
1597        config
1598            .source_resume_uppers
1599            .values()
1600            .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1601            .flatten(),
1602    );
1603
1604    let name = format!("KafkaMetadataFetcher({})", config.id);
1605    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1606
1607    let (metadata_output, metadata_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1608    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1609
1610    let button = builder.build(move |caps| async move {
1611        if !is_active_worker {
1612            return;
1613        }
1614
1615        let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1616
1617        let client_id = connection.client_id(
1618            config.config.config_set(),
1619            &config.config.connection_context,
1620            config.id,
1621        );
1622        let KafkaSourceConnection {
1623            connection,
1624            topic,
1625            topic_metadata_refresh_interval,
1626            ..
1627        } = connection;
1628
1629        let consumer: Result<BaseConsumer<_>, _> = connection
1630            .create_with_context(
1631                &config.config,
1632                MzClientContext::default(),
1633                &btreemap! {
1634                    // Use the user-configured topic metadata refresh
1635                    // interval.
1636                    "topic.metadata.refresh.interval.ms" =>
1637                        topic_metadata_refresh_interval
1638                        .as_millis()
1639                        .to_string(),
1640                    // Allow Kafka monitoring tools to identify this
1641                    // consumer.
1642                    "client.id" => format!("{client_id}-metadata"),
1643                },
1644                InTask::Yes,
1645            )
1646            .await;
1647
1648        let consumer = match consumer {
1649            Ok(consumer) => consumer,
1650            Err(e) => {
1651                let msg = format!(
1652                    "failed creating kafka metadata consumer: {}",
1653                    e.display_with_causes()
1654                );
1655                let status_update = HealthStatusUpdate::halting(msg, None);
1656                let status = match e {
1657                    ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1658                    _ => HealthStatus::kafka(status_update),
1659                };
1660                let error = MetadataUpdate::TransientError(status);
1661                let timestamp = (config.now_fn)().into();
1662                metadata_output.give(&metadata_cap, (timestamp, error));
1663
1664                // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
1665                // Returning would incorrectly present to the remap operator as progress to the
1666                // empty frontier which would be incorrectly recorded to the remap shard.
1667                std::future::pending::<()>().await;
1668                unreachable!("pending future never returns");
1669            }
1670        };
1671
1672        let (tx, mut rx) = mpsc::unbounded_channel();
1673        spawn_metadata_thread(config, consumer, topic, tx);
1674
1675        let mut prev_upstream_frontier = resume_upper;
1676
1677        while let Some((timestamp, mut update)) = rx.recv().await {
1678            if prev_upstream_frontier.is_empty() {
1679                return;
1680            }
1681
1682            if let Some(upstream_frontier) = update.upstream_frontier() {
1683                // Topics are identified by name but it's possible that a user recreates a topic
1684                // with the same name. Ideally we'd want to catch all of these cases and
1685                // immediately error out the source, since the data is effectively gone.
1686                // Unfortunately this is not possible without something like KIP-516.
1687                //
1688                // The best we can do is check whether the upstream frontier regressed. This tells
1689                // us that the topic was recreated and now contains fewer offsets and/or fewer
1690                // partitions. Note that we are not able to detect topic recreation if neither of
1691                // the two are true.
1692                if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1693                    let error = SourceError {
1694                        error: SourceErrorDetails::Other("topic was recreated".into()),
1695                    };
1696                    update = MetadataUpdate::DefiniteError(error);
1697                }
1698            }
1699
1700            if let Some(upstream_frontier) = update.upstream_frontier() {
1701                prev_upstream_frontier = upstream_frontier.clone();
1702
1703                let probe = Probe {
1704                    probe_ts: timestamp,
1705                    upstream_frontier,
1706                };
1707                probe_output.give(&probe_cap, probe);
1708            }
1709
1710            metadata_output.give(&metadata_cap, (timestamp, update));
1711        }
1712    });
1713
1714    (metadata_stream, probe_stream, button.press_on_drop())
1715}
1716
1717fn spawn_metadata_thread<C: ConsumerContext>(
1718    config: RawSourceCreationConfig,
1719    consumer: BaseConsumer<TunnelingClientContext<C>>,
1720    topic: String,
1721    tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1722) {
1723    // Linux thread names are limited to 15 characters. Use a truncated ID to fit the name.
1724    thread::Builder::new()
1725        .name(format!("kfk-mtdt-{}", config.id))
1726        .spawn(move || {
1727            trace!(
1728                source_id = config.id.to_string(),
1729                worker_id = config.worker_id,
1730                num_workers = config.worker_count,
1731                "kafka metadata thread: starting..."
1732            );
1733
1734            let mut ticker = probe::Ticker::new(
1735                || KAFKA_METADATA_FETCH_INTERVAL.get(config.config.config_set()),
1736                config.now_fn,
1737            );
1738
1739            loop {
1740                let probe_ts = ticker.tick_blocking();
1741                let result = fetch_partition_info(
1742                    &consumer,
1743                    &topic,
1744                    config
1745                        .config
1746                        .parameters
1747                        .kafka_timeout_config
1748                        .fetch_metadata_timeout,
1749                );
1750                trace!(
1751                    source_id = config.id.to_string(),
1752                    worker_id = config.worker_id,
1753                    num_workers = config.worker_count,
1754                    "kafka metadata thread: metadata fetch result: {:?}",
1755                    result
1756                );
1757                let update = match result {
1758                    Ok(partitions) => {
1759                        trace!(
1760                            source_id = config.id.to_string(),
1761                            worker_id = config.worker_id,
1762                            num_workers = config.worker_count,
1763                            "kafka metadata thread: fetched partition metadata info",
1764                        );
1765
1766                        MetadataUpdate::Partitions(partitions)
1767                    }
1768                    Err(GetPartitionsError::TopicDoesNotExist) => {
1769                        let error = SourceError {
1770                            error: SourceErrorDetails::Other("topic was deleted".into()),
1771                        };
1772                        MetadataUpdate::DefiniteError(error)
1773                    }
1774                    Err(e) => {
1775                        let kafka_status = Some(HealthStatusUpdate::stalled(
1776                            format!("{}", e.display_with_causes()),
1777                            None,
1778                        ));
1779
1780                        let ssh_status = consumer.client().context().tunnel_status();
1781                        let ssh_status = match ssh_status {
1782                            SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1783                            SshTunnelStatus::Errored(e) => {
1784                                Some(HealthStatusUpdate::stalled(e, None))
1785                            }
1786                        };
1787
1788                        MetadataUpdate::TransientError(HealthStatus {
1789                            kafka: kafka_status,
1790                            ssh: ssh_status,
1791                        })
1792                    }
1793                };
1794
1795                if tx.send((probe_ts, update)).is_err() {
1796                    break;
1797                }
1798            }
1799
1800            info!(
1801                source_id = config.id.to_string(),
1802                worker_id = config.worker_id,
1803                num_workers = config.worker_count,
1804                "kafka metadata thread: receiver has gone away; shutting down."
1805            )
1806        })
1807        .unwrap();
1808}