mz_storage/source/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::convert::Infallible;
13use std::str::{self};
14use std::sync::{Arc, Mutex};
15use std::thread;
16use std::time::Duration;
17
18use anyhow::anyhow;
19use chrono::{DateTime, NaiveDateTime};
20use differential_dataflow::{AsCollection, Hashable};
21use futures::StreamExt;
22use itertools::Itertools;
23use maplit::btreemap;
24use mz_kafka_util::client::{
25    GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
26};
27use mz_ore::assert_none;
28use mz_ore::cast::CastFrom;
29use mz_ore::error::ErrorExt;
30use mz_ore::future::InTask;
31use mz_ore::iter::IteratorExt;
32use mz_repr::adt::timestamp::CheckedTimestamp;
33use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
34use mz_ssh_util::tunnel::SshTunnelStatus;
35use mz_storage_types::dyncfgs::KAFKA_METADATA_FETCH_INTERVAL;
36use mz_storage_types::errors::{
37    ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
38};
39use mz_storage_types::sources::kafka::{
40    KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
41};
42use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
43use mz_timely_util::antichain::AntichainExt;
44use mz_timely_util::builder_async::{
45    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
46};
47use mz_timely_util::containers::stack::AccountedStackBuilder;
48use mz_timely_util::order::Partitioned;
49use rdkafka::consumer::base_consumer::PartitionQueue;
50use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
51use rdkafka::error::KafkaError;
52use rdkafka::message::{BorrowedMessage, Headers};
53use rdkafka::statistics::Statistics;
54use rdkafka::topic_partition_list::Offset;
55use rdkafka::{ClientContext, Message, TopicPartitionList};
56use serde::{Deserialize, Serialize};
57use timely::PartialOrder;
58use timely::container::CapacityContainerBuilder;
59use timely::dataflow::channels::pact::Pipeline;
60use timely::dataflow::operators::core::Partition;
61use timely::dataflow::operators::{Broadcast, Capability};
62use timely::dataflow::{Scope, Stream};
63use timely::progress::Antichain;
64use timely::progress::Timestamp;
65use tokio::sync::{Notify, mpsc};
66use tracing::{error, info, trace};
67
68use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
69use crate::metrics::source::kafka::KafkaSourceMetrics;
70use crate::source::types::{
71    Probe, ProgressStatisticsUpdate, SignaledFuture, SourceRender, StackedCollection,
72};
73use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
74
75#[derive(Clone, Debug, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
76struct HealthStatus {
77    kafka: Option<HealthStatusUpdate>,
78    ssh: Option<HealthStatusUpdate>,
79}
80
81impl HealthStatus {
82    fn kafka(update: HealthStatusUpdate) -> Self {
83        Self {
84            kafka: Some(update),
85            ssh: None,
86        }
87    }
88
89    fn ssh(update: HealthStatusUpdate) -> Self {
90        Self {
91            kafka: None,
92            ssh: Some(update),
93        }
94    }
95}
96
97/// Contains all information necessary to ingest data from Kafka
98pub struct KafkaSourceReader {
99    /// Name of the topic on which this source is backed on
100    topic_name: String,
101    /// Name of the source (will have format kafka-source-id)
102    source_name: String,
103    /// Source global ID
104    id: GlobalId,
105    /// Kafka consumer for this source
106    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
107    /// List of consumers. A consumer should be assigned per partition to guarantee fairness
108    partition_consumers: Vec<PartitionConsumer>,
109    /// Worker ID
110    worker_id: usize,
111    /// Total count of workers
112    worker_count: usize,
113    /// The most recently read offset for each partition known to this source
114    /// reader by output-index. An offset of -1 indicates that no prior message
115    /// has been read for the given partition.
116    last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
117    /// The offset to start reading from for each partition.
118    start_offsets: BTreeMap<PartitionId, i64>,
119    /// Channel to receive Kafka statistics JSON blobs from the stats callback.
120    stats_rx: crossbeam_channel::Receiver<Jsonb>,
121    /// Progress statistics as collected from the `resume_uppers` stream and the partition metadata
122    /// thread.
123    progress_statistics: Arc<Mutex<PartialProgressStatistics>>,
124    /// A handle to the partition specific metrics
125    partition_metrics: KafkaSourceMetrics,
126    /// Per partition capabilities used to produce messages
127    partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
128}
129
130/// A partially-filled version of `ProgressStatisticsUpdate`. This allows us to
131/// only emit updates when `offset_known` is updated by the metadata thread.
132#[derive(Default)]
133struct PartialProgressStatistics {
134    offset_known: Option<u64>,
135    offset_committed: Option<u64>,
136}
137
138struct PartitionCapability {
139    /// The capability of the data produced
140    data: Capability<KafkaTimestamp>,
141    /// The capability of the progress stream
142    progress: Capability<KafkaTimestamp>,
143}
144
145/// The high watermark offsets of a Kafka partition.
146///
147/// This is the offset of the latest message in the topic/partition available for consumption + 1.
148type HighWatermark = u64;
149
150/// Processes `resume_uppers` stream updates, committing them upstream and
151/// storing them in the `progress_statistics` to be emitted later.
152pub struct KafkaResumeUpperProcessor {
153    config: RawSourceCreationConfig,
154    topic_name: String,
155    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
156    progress_statistics: Arc<Mutex<PartialProgressStatistics>>,
157}
158
159/// Computes whether this worker is responsible for consuming a partition. It assigns partitions to
160/// workers in a round-robin fashion, starting at an arbitrary worker based on the hash of the
161/// source id.
162fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
163    let pid = usize::try_from(pid).expect("positive pid");
164    ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
165}
166
167struct SourceOutputInfo {
168    id: GlobalId,
169    output_index: usize,
170    resume_upper: Antichain<KafkaTimestamp>,
171    metadata_columns: Vec<KafkaMetadataKind>,
172}
173
174impl SourceRender for KafkaSourceConnection {
175    // TODO(petrosagg): The type used for the partition (RangeBound<PartitionId>) doesn't need to
176    // be so complicated and we could instead use `Partitioned<PartitionId, Option<u64>>` where all
177    // ranges are inclusive and a time of `None` signifies that a particular partition is not
178    // present. This requires an shard migration of the remap shard.
179    type Time = KafkaTimestamp;
180
181    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
182
183    fn render<G: Scope<Timestamp = KafkaTimestamp>>(
184        self,
185        scope: &mut G,
186        config: &RawSourceCreationConfig,
187        resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
188        start_signal: impl std::future::Future<Output = ()> + 'static,
189    ) -> (
190        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
191        Stream<G, Infallible>,
192        Stream<G, HealthStatusMessage>,
193        Stream<G, ProgressStatisticsUpdate>,
194        Option<Stream<G, Probe<KafkaTimestamp>>>,
195        Vec<PressOnDropButton>,
196    ) {
197        let (metadata, probes, metadata_token) =
198            render_metadata_fetcher(scope, self.clone(), config.clone());
199        let (data, progress, health, stats, reader_token) = render_reader(
200            scope,
201            self,
202            config.clone(),
203            resume_uppers,
204            metadata,
205            start_signal,
206        );
207
208        let partition_count = u64::cast_from(config.source_exports.len());
209        let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
210            partition_count,
211            |((output, data), time, diff): &(
212                (usize, Result<SourceMessage, DataflowError>),
213                _,
214                Diff,
215            )| {
216                let output = u64::cast_from(*output);
217                (output, (data.clone(), time.clone(), diff.clone()))
218            },
219        );
220        let mut data_collections = BTreeMap::new();
221        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
222            data_collections.insert(*id, data_stream.as_collection());
223        }
224
225        (
226            data_collections,
227            progress,
228            health,
229            stats,
230            Some(probes),
231            vec![metadata_token, reader_token],
232        )
233    }
234}
235
236/// Render the reader of a Kafka source.
237///
238/// The reader is responsible for polling the Kafka topic partitions for new messages, and
239/// transforming them into a `SourceMessage` collection.
240fn render_reader<G: Scope<Timestamp = KafkaTimestamp>>(
241    scope: &G,
242    connection: KafkaSourceConnection,
243    config: RawSourceCreationConfig,
244    resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
245    metadata_stream: Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
246    start_signal: impl std::future::Future<Output = ()> + 'static,
247) -> (
248    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
249    Stream<G, Infallible>,
250    Stream<G, HealthStatusMessage>,
251    Stream<G, ProgressStatisticsUpdate>,
252    PressOnDropButton,
253) {
254    let name = format!("KafkaReader({})", config.id);
255    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
256
257    let (data_output, stream) = builder.new_output::<AccountedStackBuilder<_>>();
258    let (_progress_output, progress_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
259    let (health_output, health_stream) = builder.new_output();
260    let (stats_output, stats_stream) = builder.new_output();
261
262    let mut metadata_input = builder.new_disconnected_input(&metadata_stream.broadcast(), Pipeline);
263
264    let mut outputs = vec![];
265    for (idx, (id, export)) in config.source_exports.iter().enumerate() {
266        let SourceExport {
267            details,
268            storage_metadata: _,
269            data_config: _,
270        } = export;
271        let resume_upper = Antichain::from_iter(
272            config
273                .source_resume_uppers
274                .get(id)
275                .expect("all source exports must be present in source resume uppers")
276                .iter()
277                .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
278        );
279
280        let metadata_columns = match details {
281            SourceExportDetails::Kafka(details) => details
282                .metadata_columns
283                .iter()
284                .map(|(_name, kind)| kind.clone())
285                .collect::<Vec<_>>(),
286            SourceExportDetails::None => {
287                // This is an export that doesn't need any data output to it.
288                continue;
289            }
290            _ => panic!("unexpected source export details: {:?}", details),
291        };
292
293        let output = SourceOutputInfo {
294            id: *id,
295            resume_upper,
296            output_index: idx,
297            metadata_columns,
298        };
299        outputs.push(output);
300    }
301
302    let busy_signal = Arc::clone(&config.busy_signal);
303    let button = builder.build(move |caps| {
304        SignaledFuture::new(busy_signal, async move {
305            let [mut data_cap, mut progress_cap, health_cap, stats_cap] = caps.try_into().unwrap();
306
307            let client_id = connection.client_id(
308                config.config.config_set(),
309                &config.config.connection_context,
310                config.id,
311            );
312            let group_id = connection.group_id(&config.config.connection_context, config.id);
313            let KafkaSourceConnection {
314                connection,
315                topic,
316                topic_metadata_refresh_interval,
317                start_offsets,
318                metadata_columns: _,
319                // Exhaustive match protects against forgetting to apply an
320                // option. Ignored fields are justified below.
321                connection_id: _,   // not needed here
322                group_id_prefix: _, // used above via `connection.group_id`
323            } = connection;
324
325            // Start offsets is a map from partition to the next offset to read from.
326            let mut start_offsets: BTreeMap<_, i64> = start_offsets
327                .clone()
328                .into_iter()
329                .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
330                .map(|(k, v)| (k, v))
331                .collect();
332
333            let mut partition_capabilities = BTreeMap::new();
334            let mut max_pid = None;
335            let resume_upper = Antichain::from_iter(
336                outputs
337                    .iter()
338                    .map(|output| output.resume_upper.clone())
339                    .flatten(),
340            );
341
342            // Whether or not this instance of the dataflow is performing a snapshot.
343            let mut is_snapshotting = &*resume_upper == &[Partitioned::minimum()];
344
345            for ts in resume_upper.elements() {
346                if let Some(pid) = ts.interval().singleton() {
347                    let pid = pid.unwrap_exact();
348                    max_pid = std::cmp::max(max_pid, Some(*pid));
349                    if responsible_for_pid(&config, *pid) {
350                        let restored_offset = i64::try_from(ts.timestamp().offset)
351                            .expect("restored kafka offsets must fit into i64");
352                        if let Some(start_offset) = start_offsets.get_mut(pid) {
353                            *start_offset = std::cmp::max(restored_offset, *start_offset);
354                        } else {
355                            start_offsets.insert(*pid, restored_offset);
356                        }
357
358                        let part_ts = Partitioned::new_singleton(
359                            RangeBound::exact(*pid),
360                            ts.timestamp().clone(),
361                        );
362                        let part_cap = PartitionCapability {
363                            data: data_cap.delayed(&part_ts),
364                            progress: progress_cap.delayed(&part_ts),
365                        };
366                        partition_capabilities.insert(*pid, part_cap);
367                    }
368                }
369            }
370            let lower = max_pid
371                .map(RangeBound::after)
372                .unwrap_or(RangeBound::NegInfinity);
373            let future_ts =
374                Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
375            data_cap.downgrade(&future_ts);
376            progress_cap.downgrade(&future_ts);
377
378            info!(
379                source_id = config.id.to_string(),
380                worker_id = config.worker_id,
381                num_workers = config.worker_count,
382                "instantiating Kafka source reader at offsets {start_offsets:?}"
383            );
384
385            let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
386            let notificator = Arc::new(Notify::new());
387
388            let consumer: Result<BaseConsumer<_>, _> = connection
389                .create_with_context(
390                    &config.config,
391                    GlueConsumerContext {
392                        notificator: Arc::clone(&notificator),
393                        stats_tx,
394                        inner: MzClientContext::default(),
395                    },
396                    &btreemap! {
397                        // Disable Kafka auto commit. We manually commit offsets
398                        // to Kafka once we have reclocked those offsets, so
399                        // that users can use standard Kafka tools for progress
400                        // tracking.
401                        "enable.auto.commit" => "false".into(),
402                        // Always begin ingest at 0 when restarted, even if Kafka
403                        // contains committed consumer read offsets
404                        "auto.offset.reset" => "earliest".into(),
405                        // Use the user-configured topic metadata refresh
406                        // interval.
407                        "topic.metadata.refresh.interval.ms" =>
408                            topic_metadata_refresh_interval
409                            .as_millis()
410                            .to_string(),
411                        // TODO: document the rationale for this.
412                        "fetch.message.max.bytes" => "134217728".into(),
413                        // Consumer group ID, which may have been overridden by
414                        // the user. librdkafka requires this, and we use offset
415                        // committing to provide a way for users to monitor
416                        // ingest progress, though we do not rely on the
417                        // committed offsets for any functionality.
418                        "group.id" => group_id.clone(),
419                        // Allow Kafka monitoring tools to identify this
420                        // consumer.
421                        "client.id" => client_id.clone(),
422                    },
423                    InTask::Yes,
424                )
425                .await;
426
427            let consumer = match consumer {
428                Ok(consumer) => Arc::new(consumer),
429                Err(e) => {
430                    let update = HealthStatusUpdate::halting(
431                        format!(
432                            "failed creating kafka reader consumer: {}",
433                            e.display_with_causes()
434                        ),
435                        None,
436                    );
437                    for (output, update) in outputs.iter().repeat_clone(update) {
438                        health_output.give(
439                            &health_cap,
440                            HealthStatusMessage {
441                                id: Some(output.id),
442                                namespace: if matches!(e, ContextCreationError::Ssh(_)) {
443                                    StatusNamespace::Ssh
444                                } else {
445                                    StatusNamespace::Kafka
446                                },
447                                update,
448                            },
449                        );
450                    }
451                    // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
452                    // Returning would incorrectly present to the remap operator as progress to the
453                    // empty frontier which would be incorrectly recorded to the remap shard.
454                    std::future::pending::<()>().await;
455                    unreachable!("pending future never returns");
456                }
457            };
458
459            // Note that we wait for this AFTER we downgrade to the source `resume_upper`. This
460            // allows downstream operators (namely, the `reclock_operator`) to downgrade to the
461            // `resume_upper`, which is necessary for this basic form of backpressure to work.
462            start_signal.await;
463            info!(
464                source_id = config.id.to_string(),
465                worker_id = config.worker_id,
466                num_workers = config.worker_count,
467                "kafka worker noticed rehydration is finished, starting partition queues..."
468            );
469
470            let partition_ids = start_offsets.keys().copied().collect();
471            let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
472
473            let mut reader = KafkaSourceReader {
474                topic_name: topic.clone(),
475                source_name: config.name.clone(),
476                id: config.id,
477                partition_consumers: Vec::new(),
478                consumer: Arc::clone(&consumer),
479                worker_id: config.worker_id,
480                worker_count: config.worker_count,
481                last_offsets: outputs
482                    .iter()
483                    .map(|output| (output.output_index, BTreeMap::new()))
484                    .collect(),
485                start_offsets,
486                stats_rx,
487                progress_statistics: Default::default(),
488                partition_metrics: config.metrics.get_kafka_source_metrics(
489                    partition_ids,
490                    topic.clone(),
491                    config.id,
492                ),
493                partition_capabilities,
494            };
495
496            let offset_committer = KafkaResumeUpperProcessor {
497                config: config.clone(),
498                topic_name: topic.clone(),
499                consumer,
500                progress_statistics: Arc::clone(&reader.progress_statistics),
501            };
502
503            // Seed the progress metrics with `0` if we are snapshotting.
504            if is_snapshotting {
505                if let Err(e) = offset_committer
506                    .process_frontier(resume_upper.clone())
507                    .await
508                {
509                    offset_commit_metrics.offset_commit_failures.inc();
510                    tracing::warn!(
511                        %e,
512                        "timely-{} source({}) failed to commit offsets: resume_upper={}",
513                        config.id,
514                        config.worker_id,
515                        resume_upper.pretty()
516                    );
517                }
518            }
519
520            let resume_uppers_process_loop = async move {
521                tokio::pin!(resume_uppers);
522                while let Some(frontier) = resume_uppers.next().await {
523                    if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
524                        offset_commit_metrics.offset_commit_failures.inc();
525                        tracing::warn!(
526                            %e,
527                            "timely-{} source({}) failed to commit offsets: resume_upper={}",
528                            config.id,
529                            config.worker_id,
530                            frontier.pretty()
531                        );
532                    }
533                }
534                // During dataflow shutdown this loop can end due to the general chaos caused by
535                // dropping tokens as a means to shutdown. This call ensures this future never ends
536                // and we instead rely on this operator being dropped altogether when *its* token
537                // is dropped.
538                std::future::pending::<()>().await;
539            };
540            tokio::pin!(resume_uppers_process_loop);
541
542            let mut prev_offset_known = None;
543            let mut prev_offset_committed = None;
544            let mut metadata_update: Option<MetadataUpdate> = None;
545            let mut snapshot_total = None;
546
547            let max_wait_time =
548                mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
549            loop {
550                // Wait for data or metadata events while also making progress with offset
551                // committing.
552                tokio::select! {
553                    // TODO(petrosagg): remove the timeout and rely purely on librdkafka waking us
554                    // up
555                    _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
556
557                    _ = metadata_input.ready() => {
558                        // Collect all pending updates, then only keep the most recent one.
559                        let mut updates = Vec::new();
560                        while let Some(event) = metadata_input.next_sync() {
561                            if let Event::Data(_, mut data) = event {
562                                updates.append(&mut data);
563                            }
564                        }
565                        metadata_update = updates
566                            .into_iter()
567                            .max_by_key(|(ts, _)| *ts)
568                            .map(|(_, update)| update);
569                    }
570
571                    // This future is not cancel safe but we are only passing a reference to it in
572                    // the select! loop so the future stays on the stack and never gets cancelled
573                    // until the end of the function.
574                    _ = resume_uppers_process_loop.as_mut() => {},
575                }
576
577                match metadata_update.take() {
578                    Some(MetadataUpdate::Partitions(partitions)) => {
579                        let max_pid = partitions.keys().last().cloned();
580                        let lower = max_pid
581                            .map(RangeBound::after)
582                            .unwrap_or(RangeBound::NegInfinity);
583                        let future_ts = Partitioned::new_range(
584                            lower,
585                            RangeBound::PosInfinity,
586                            MzOffset::from(0),
587                        );
588
589                        let mut upstream_stat = 0;
590                        for (&pid, &high_watermark) in &partitions {
591                            if responsible_for_pid(&config, pid) {
592                                upstream_stat += high_watermark;
593                                reader.ensure_partition(pid);
594                                if let Entry::Vacant(entry) =
595                                    reader.partition_capabilities.entry(pid)
596                                {
597                                    let start_offset = match reader.start_offsets.get(&pid) {
598                                        Some(&offset) => offset.try_into().unwrap(),
599                                        None => 0u64,
600                                    };
601                                    let part_since_ts = Partitioned::new_singleton(
602                                        RangeBound::exact(pid),
603                                        MzOffset::from(start_offset),
604                                    );
605                                    let part_upper_ts = Partitioned::new_singleton(
606                                        RangeBound::exact(pid),
607                                        MzOffset::from(high_watermark),
608                                    );
609
610                                    // This is the moment at which we have discovered a new partition
611                                    // and we need to make sure we produce its initial snapshot at a,
612                                    // single timestamp so that the source transitions from no data
613                                    // from this partition to all the data of this partition. We do
614                                    // this by initializing the data capability to the starting offset
615                                    // and, importantly, the progress capability directly to the high
616                                    // watermark. This jump of the progress capability ensures that
617                                    // everything until the high watermark will be reclocked to a
618                                    // single point.
619                                    entry.insert(PartitionCapability {
620                                        data: data_cap.delayed(&part_since_ts),
621                                        progress: progress_cap.delayed(&part_upper_ts),
622                                    });
623                                }
624                            }
625                        }
626
627                        // If we are snapshotting, record our first set of partitions as the snapshot
628                        // size.
629                        if is_snapshotting && snapshot_total.is_none() {
630                            // Note that we want to represent the _number of offsets_, which
631                            // means the watermark's frontier semantics is correct, without
632                            // subtracting (Kafka offsets start at 0).
633                            snapshot_total = Some(upstream_stat);
634                        }
635
636                        // Clear all the health namespaces we know about.
637                        // Note that many kafka sources's don't have an ssh tunnel, but the
638                        // `health_operator` handles this fine.
639                        for output in &outputs {
640                            for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
641                                health_output.give(
642                                    &health_cap,
643                                    HealthStatusMessage {
644                                        id: Some(output.id),
645                                        namespace,
646                                        update: HealthStatusUpdate::running(),
647                                    },
648                                );
649                            }
650                        }
651
652                        let mut progress_statistics =
653                            reader.progress_statistics.lock().expect("poisoned");
654                        progress_statistics.offset_known = Some(upstream_stat);
655                        data_cap.downgrade(&future_ts);
656                        progress_cap.downgrade(&future_ts);
657                    }
658                    Some(MetadataUpdate::TransientError(status)) => {
659                        if let Some(update) = status.kafka {
660                            for (output, update) in outputs.iter().repeat_clone(update) {
661                                health_output.give(
662                                    &health_cap,
663                                    HealthStatusMessage {
664                                        id: Some(output.id),
665                                        namespace: StatusNamespace::Kafka,
666                                        update,
667                                    },
668                                );
669                            }
670                        }
671                        if let Some(update) = status.ssh {
672                            for (output, update) in outputs.iter().repeat_clone(update) {
673                                health_output.give(
674                                    &health_cap,
675                                    HealthStatusMessage {
676                                        id: Some(output.id),
677                                        namespace: StatusNamespace::Ssh,
678                                        update,
679                                    },
680                                );
681                            }
682                        }
683                    }
684                    Some(MetadataUpdate::DefiniteError(error)) => {
685                        let error = Err(error.into());
686                        let time = data_cap.time().clone();
687                        for (output, error) in
688                            outputs.iter().map(|o| o.output_index).repeat_clone(error)
689                        {
690                            data_output
691                                .give_fueled(&data_cap, ((output, error), time, Diff::ONE))
692                                .await;
693                        }
694
695                        return;
696                    }
697                    None => {}
698                }
699
700                // Poll the consumer once. We split the consumer's partitions out into separate
701                // queues and poll those individually, but it's still necessary to drive logic that
702                // consumes from rdkafka's internal event queue, such as statistics callbacks.
703                //
704                // Additionally, assigning topics and splitting them off into separate queues is
705                // not atomic, so we expect to see at least some messages to show up when polling
706                // the consumer directly.
707                while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
708                    match result {
709                        Err(e) => {
710                            let error = format!(
711                                "kafka error when polling consumer for source: {} topic: {} : {}",
712                                reader.source_name, reader.topic_name, e
713                            );
714                            let status = HealthStatusUpdate::stalled(error, None);
715                            for (output, status) in outputs.iter().repeat_clone(status) {
716                                health_output.give(
717                                    &health_cap,
718                                    HealthStatusMessage {
719                                        id: Some(output.id),
720                                        namespace: StatusNamespace::Kafka,
721                                        update: status,
722                                    },
723                                );
724                            }
725                        }
726                        Ok(message) => {
727                            let output_messages = outputs
728                                .iter()
729                                .map(|output| {
730                                    let (message, ts) = construct_source_message(
731                                        &message,
732                                        &output.metadata_columns,
733                                    );
734                                    (output.output_index, message, ts)
735                                })
736                                // This vec allocation is required to allow obtaining a `&mut`
737                                // on `reader` for the `reader.handle_message` call in the
738                                // loop below since  `message` is borrowed from `reader`.
739                                .collect::<Vec<_>>();
740                            for (output_index, message, ts) in output_messages {
741                                if let Some((msg, time, diff)) =
742                                    reader.handle_message(message, ts, &output_index)
743                                {
744                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
745                                    let part_cap = &reader.partition_capabilities[pid].data;
746                                    let msg = msg.map_err(|e| {
747                                        DataflowError::SourceError(Box::new(SourceError {
748                                            error: SourceErrorDetails::Other(e.to_string().into()),
749                                        }))
750                                    });
751                                    data_output
752                                        .give_fueled(part_cap, ((output_index, msg), time, diff))
753                                        .await;
754                                }
755                            }
756                        }
757                    }
758                }
759
760                reader.update_stats();
761
762                // Take the consumers temporarily to get around borrow checker errors
763                let mut consumers = std::mem::take(&mut reader.partition_consumers);
764                for consumer in consumers.iter_mut() {
765                    let pid = consumer.pid();
766                    // We want to make sure the rest of the actions in the outer loops get
767                    // a chance to run. If rdkafka keeps pumping data at us we might find
768                    // ourselves in a situation where we keep dumping data into the
769                    // dataflow without signaling progress. For this reason we consume at most
770                    // 10k messages from each partition and go around the loop.
771                    let mut partition_exhausted = false;
772                    for _ in 0..10_000 {
773                        let Some(message) = consumer.get_next_message().transpose() else {
774                            partition_exhausted = true;
775                            break;
776                        };
777
778                        for output in outputs.iter() {
779                            let message = match &message {
780                                Ok((msg, pid)) => {
781                                    let (msg, ts) =
782                                        construct_source_message(msg, &output.metadata_columns);
783                                    assert_eq!(*pid, ts.0);
784                                    Ok(reader.handle_message(msg, ts, &output.output_index))
785                                }
786                                Err(err) => Err(err),
787                            };
788                            match message {
789                                Ok(Some((msg, time, diff))) => {
790                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
791                                    let part_cap = &reader.partition_capabilities[pid].data;
792                                    let msg = msg.map_err(|e| {
793                                        DataflowError::SourceError(Box::new(SourceError {
794                                            error: SourceErrorDetails::Other(e.to_string().into()),
795                                        }))
796                                    });
797                                    data_output
798                                        .give_fueled(
799                                            part_cap,
800                                            ((output.output_index, msg), time, diff),
801                                        )
802                                        .await;
803                                }
804                                // The message was from an offset we've already seen.
805                                Ok(None) => continue,
806                                Err(err) => {
807                                    let last_offset = reader
808                                        .last_offsets
809                                        .get(&output.output_index)
810                                        .expect("output known to be installed")
811                                        .get(&pid)
812                                        .expect("partition known to be installed");
813
814                                    let status = HealthStatusUpdate::stalled(
815                                        format!(
816                                            "error consuming from source: {} topic: {topic}:\
817                                             partition: {pid} last processed offset:\
818                                             {last_offset} : {err}",
819                                            config.name
820                                        ),
821                                        None,
822                                    );
823                                    health_output.give(
824                                        &health_cap,
825                                        HealthStatusMessage {
826                                            id: Some(output.id),
827                                            namespace: StatusNamespace::Kafka,
828                                            update: status,
829                                        },
830                                    );
831                                }
832                            }
833                        }
834                    }
835                    if !partition_exhausted {
836                        notificator.notify_one();
837                    }
838                }
839                // We can now put them back
840                assert!(reader.partition_consumers.is_empty());
841                reader.partition_consumers = consumers;
842
843                let positions = reader.consumer.position().unwrap();
844                let topic_positions = positions.elements_for_topic(&reader.topic_name);
845                let mut snapshot_staged = 0;
846
847                for position in topic_positions {
848                    // The offset begins in the `Offset::Invalid` state in which case we simply
849                    // skip this partition.
850                    if let Offset::Offset(offset) = position.offset() {
851                        let pid = position.partition();
852                        let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
853                        let upper =
854                            Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
855
856                        let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
857                        match part_cap.data.try_downgrade(&upper) {
858                            Ok(()) => {
859                                if is_snapshotting {
860                                    // The `.position()` of the consumer represents what offset we have
861                                    // read up to.
862                                    snapshot_staged += offset.try_into().unwrap_or(0u64);
863                                    // This will always be `Some` at this point.
864                                    if let Some(snapshot_total) = snapshot_total {
865                                        // We will eventually read past the snapshot total, so we need
866                                        // to bound it here.
867                                        snapshot_staged =
868                                            std::cmp::min(snapshot_staged, snapshot_total);
869                                    }
870                                }
871                            }
872                            Err(_) => {
873                                // If we can't downgrade, it means we have already seen this offset.
874                                // This is expected and we can safely ignore it.
875                                info!(
876                                    source_id = config.id.to_string(),
877                                    worker_id = config.worker_id,
878                                    num_workers = config.worker_count,
879                                    "kafka source frontier downgrade skipped due to already \
880                                     seen offset: {:?}",
881                                    upper
882                                );
883                            }
884                        };
885
886                        // We use try_downgrade here because during the initial snapshot phase the
887                        // data capability is not beyond the progress capability and therefore a
888                        // normal downgrade would panic. Once it catches up though the data
889                        // capbility is what's pushing the progress capability forward.
890                        let _ = part_cap.progress.try_downgrade(&upper);
891                    }
892                }
893
894                // If we have a new `offset_known` from the partition metadata thread, and
895                // `committed` from reading the `resume_uppers` stream, we can emit a
896                // progress stats update.
897                let mut stats =
898                    { std::mem::take(&mut *reader.progress_statistics.lock().expect("poisoned")) };
899
900                let offset_committed = stats.offset_committed.take().or(prev_offset_committed);
901                let offset_known = stats.offset_known.take().or(prev_offset_known);
902                if let Some((offset_known, offset_committed)) = offset_known.zip(offset_committed) {
903                    stats_output.give(
904                        &stats_cap,
905                        ProgressStatisticsUpdate::SteadyState {
906                            offset_committed,
907                            offset_known,
908                        },
909                    );
910                }
911                prev_offset_committed = offset_committed;
912                prev_offset_known = offset_known;
913
914                if let (Some(snapshot_total), true) = (snapshot_total, is_snapshotting) {
915                    stats_output.give(
916                        &stats_cap,
917                        ProgressStatisticsUpdate::Snapshot {
918                            records_known: snapshot_total,
919                            records_staged: snapshot_staged,
920                        },
921                    );
922
923                    if snapshot_total == snapshot_staged {
924                        is_snapshotting = false;
925                    }
926                }
927            }
928        })
929    });
930
931    (
932        stream.as_collection(),
933        progress_stream,
934        health_stream,
935        stats_stream,
936        button.press_on_drop(),
937    )
938}
939
940impl KafkaResumeUpperProcessor {
941    async fn process_frontier(
942        &self,
943        frontier: Antichain<KafkaTimestamp>,
944    ) -> Result<(), anyhow::Error> {
945        use rdkafka::consumer::CommitMode;
946
947        // Generate a list of partitions that this worker is responsible for
948        let mut offsets = vec![];
949        let mut progress_stat = 0;
950        for ts in frontier.iter() {
951            if let Some(pid) = ts.interval().singleton() {
952                let pid = pid.unwrap_exact();
953                if responsible_for_pid(&self.config, *pid) {
954                    offsets.push((pid.clone(), *ts.timestamp()));
955
956                    // Note that we do not subtract 1 from the frontier. Imagine
957                    // that frontier is 2 for this pid. That means we have
958                    // full processed offset 0 and offset 1, which means we have
959                    // processed _2_ offsets.
960                    progress_stat += ts.timestamp().offset;
961                }
962            }
963        }
964        self.progress_statistics
965            .lock()
966            .expect("poisoned")
967            .offset_committed = Some(progress_stat);
968
969        if !offsets.is_empty() {
970            let mut tpl = TopicPartitionList::new();
971            for (pid, offset) in offsets {
972                let offset_to_commit =
973                    Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
974                tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
975                    .expect("offset known to be valid");
976            }
977            let consumer = Arc::clone(&self.consumer);
978            mz_ore::task::spawn_blocking(
979                || format!("source({}) kafka offset commit", self.config.id),
980                move || consumer.commit(&tpl, CommitMode::Sync),
981            )
982            .await??;
983        }
984        Ok(())
985    }
986}
987
988impl KafkaSourceReader {
989    /// Ensures that a partition queue for `pid` exists.
990    fn ensure_partition(&mut self, pid: PartitionId) {
991        if self.last_offsets.is_empty() {
992            tracing::info!(
993                source_id = %self.id,
994                worker_id = %self.worker_id,
995                "kafka source does not have any outputs, not creating partition queue");
996
997            return;
998        }
999        for last_offsets in self.last_offsets.values() {
1000            // early exit if we've already inserted this partition
1001            if last_offsets.contains_key(&pid) {
1002                return;
1003            }
1004        }
1005
1006        let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1007        self.create_partition_queue(pid, Offset::Offset(start_offset));
1008
1009        for last_offsets in self.last_offsets.values_mut() {
1010            let prev = last_offsets.insert(pid, start_offset - 1);
1011            assert_none!(prev);
1012        }
1013    }
1014
1015    /// Creates a new partition queue for `partition_id`.
1016    fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1017        info!(
1018            source_id = self.id.to_string(),
1019            worker_id = self.worker_id,
1020            num_workers = self.worker_count,
1021            "activating Kafka queue for topic {}, partition {}",
1022            self.topic_name,
1023            partition_id,
1024        );
1025
1026        // Collect old partition assignments
1027        let tpl = self.consumer.assignment().unwrap();
1028        // Create list from assignments
1029        let mut partition_list = TopicPartitionList::new();
1030        for partition in tpl.elements_for_topic(&self.topic_name) {
1031            partition_list
1032                .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1033                .expect("offset known to be valid");
1034        }
1035        // Add new partition
1036        partition_list
1037            .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1038            .expect("offset known to be valid");
1039        self.consumer
1040            .assign(&partition_list)
1041            .expect("assignment known to be valid");
1042
1043        // Since librdkafka v1.6.0, we need to recreate all partition queues
1044        // after every call to `self.consumer.assign`.
1045        let context = Arc::clone(self.consumer.context());
1046        for pc in &mut self.partition_consumers {
1047            pc.partition_queue = self
1048                .consumer
1049                .split_partition_queue(&self.topic_name, pc.pid)
1050                .expect("partition known to be valid");
1051            pc.partition_queue.set_nonempty_callback({
1052                let context = Arc::clone(&context);
1053                move || context.inner().activate()
1054            });
1055        }
1056
1057        let mut partition_queue = self
1058            .consumer
1059            .split_partition_queue(&self.topic_name, partition_id)
1060            .expect("partition known to be valid");
1061        partition_queue.set_nonempty_callback(move || context.inner().activate());
1062        self.partition_consumers
1063            .push(PartitionConsumer::new(partition_id, partition_queue));
1064        assert_eq!(
1065            self.consumer
1066                .assignment()
1067                .unwrap()
1068                .elements_for_topic(&self.topic_name)
1069                .len(),
1070            self.partition_consumers.len()
1071        );
1072    }
1073
1074    /// Read any statistics JSON blobs generated via the rdkafka statistics callback.
1075    fn update_stats(&mut self) {
1076        while let Ok(stats) = self.stats_rx.try_recv() {
1077            match serde_json::from_str::<Statistics>(&stats.to_string()) {
1078                Ok(statistics) => {
1079                    let topic = statistics.topics.get(&self.topic_name);
1080                    match topic {
1081                        Some(topic) => {
1082                            for (id, partition) in &topic.partitions {
1083                                self.partition_metrics
1084                                    .set_offset_max(*id, partition.hi_offset);
1085                            }
1086                        }
1087                        None => error!("No stats found for topic: {}", &self.topic_name),
1088                    }
1089                }
1090                Err(e) => {
1091                    error!("failed decoding librdkafka statistics JSON: {}", e);
1092                }
1093            }
1094        }
1095    }
1096
1097    /// Checks if the given message is viable for emission. This checks if the message offset is
1098    /// past the expected offset and returns None if it is not.
1099    fn handle_message(
1100        &mut self,
1101        message: Result<SourceMessage, KafkaHeaderParseError>,
1102        (partition, offset): (PartitionId, MzOffset),
1103        output_index: &usize,
1104    ) -> Option<(
1105        Result<SourceMessage, KafkaHeaderParseError>,
1106        KafkaTimestamp,
1107        Diff,
1108    )> {
1109        // Offsets are guaranteed to be 1) monotonically increasing *unless* there is
1110        // a network issue or a new partition added, at which point the consumer may
1111        // start processing the topic from the beginning, or we may see duplicate offsets
1112        // At all times, the guarantee : if we see offset x, we have seen all offsets [0,x-1]
1113        // that we are ever going to see holds.
1114        // Offsets are guaranteed to be contiguous when compaction is disabled. If compaction
1115        // is enabled, there may be gaps in the sequence.
1116        // If we see an "old" offset, we skip that message.
1117
1118        // Given the explicit consumer to partition assignment, we should never receive a message
1119        // for a partition for which we have no metadata
1120        assert!(
1121            self.last_offsets
1122                .get(output_index)
1123                .unwrap()
1124                .contains_key(&partition)
1125        );
1126
1127        let last_offset_ref = self
1128            .last_offsets
1129            .get_mut(output_index)
1130            .expect("output known to be installed")
1131            .get_mut(&partition)
1132            .expect("partition known to be installed");
1133
1134        let last_offset = *last_offset_ref;
1135        let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1136        if offset_as_i64 <= last_offset {
1137            info!(
1138                source_id = self.id.to_string(),
1139                worker_id = self.worker_id,
1140                num_workers = self.worker_count,
1141                "kafka message before expected offset: \
1142                 source {} (reading topic {}, partition {}, output {}) \
1143                 received offset {} expected offset {:?}",
1144                self.source_name,
1145                self.topic_name,
1146                partition,
1147                output_index,
1148                offset.offset,
1149                last_offset + 1,
1150            );
1151            // We explicitly should not consume the message as we have already processed it.
1152            None
1153        } else {
1154            *last_offset_ref = offset_as_i64;
1155
1156            let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1157            Some((message, ts, Diff::ONE))
1158        }
1159    }
1160}
1161
1162fn construct_source_message(
1163    msg: &BorrowedMessage<'_>,
1164    metadata_columns: &[KafkaMetadataKind],
1165) -> (
1166    Result<SourceMessage, KafkaHeaderParseError>,
1167    (PartitionId, MzOffset),
1168) {
1169    let pid = msg.partition();
1170    let Ok(offset) = u64::try_from(msg.offset()) else {
1171        panic!(
1172            "got negative offset ({}) from otherwise non-error'd kafka message",
1173            msg.offset()
1174        );
1175    };
1176
1177    let mut metadata = Row::default();
1178    let mut packer = metadata.packer();
1179    for kind in metadata_columns {
1180        match kind {
1181            KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1182            KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1183            KafkaMetadataKind::Timestamp => {
1184                let ts = msg
1185                    .timestamp()
1186                    .to_millis()
1187                    .expect("kafka sources always have upstream_time");
1188
1189                let d: Datum = DateTime::from_timestamp_millis(ts)
1190                    .and_then(|dt| {
1191                        let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1192                            dt.naive_utc().try_into().ok();
1193                        ct
1194                    })
1195                    .into();
1196                packer.push(d)
1197            }
1198            KafkaMetadataKind::Header { key, use_bytes } => {
1199                match msg.headers() {
1200                    Some(headers) => {
1201                        let d = headers
1202                            .iter()
1203                            .filter(|header| header.key == key)
1204                            .last()
1205                            .map(|header| match header.value {
1206                                Some(v) => {
1207                                    if *use_bytes {
1208                                        Ok(Datum::Bytes(v))
1209                                    } else {
1210                                        match str::from_utf8(v) {
1211                                            Ok(str) => Ok(Datum::String(str)),
1212                                            Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1213                                                key: key.clone(),
1214                                                raw: v.to_vec(),
1215                                            }),
1216                                        }
1217                                    }
1218                                }
1219                                None => Ok(Datum::Null),
1220                            })
1221                            .unwrap_or(Err(KafkaHeaderParseError::KeyNotFound {
1222                                key: key.clone(),
1223                            }));
1224                        match d {
1225                            Ok(d) => packer.push(d),
1226                            //abort with a definite error when the header is not found or cannot be parsed correctly
1227                            Err(err) => return (Err(err), (pid, offset.into())),
1228                        }
1229                    }
1230                    None => packer.push(Datum::Null),
1231                }
1232            }
1233            KafkaMetadataKind::Headers => {
1234                packer.push_list_with(|r| {
1235                    if let Some(headers) = msg.headers() {
1236                        for header in headers.iter() {
1237                            match header.value {
1238                                Some(v) => r.push_list_with(|record_row| {
1239                                    record_row.push(Datum::String(header.key));
1240                                    record_row.push(Datum::Bytes(v));
1241                                }),
1242                                None => r.push_list_with(|record_row| {
1243                                    record_row.push(Datum::String(header.key));
1244                                    record_row.push(Datum::Null);
1245                                }),
1246                            }
1247                        }
1248                    }
1249                });
1250            }
1251        }
1252    }
1253
1254    let key = match msg.key() {
1255        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1256        None => Row::pack([Datum::Null]),
1257    };
1258    let value = match msg.payload() {
1259        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1260        None => Row::pack([Datum::Null]),
1261    };
1262    (
1263        Ok(SourceMessage {
1264            key,
1265            value,
1266            metadata,
1267        }),
1268        (pid, offset.into()),
1269    )
1270}
1271
1272/// Wrapper around a partition containing the underlying consumer
1273struct PartitionConsumer {
1274    /// the partition id with which this consumer is associated
1275    pid: PartitionId,
1276    /// The underlying Kafka partition queue
1277    partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1278}
1279
1280impl PartitionConsumer {
1281    /// Creates a new partition consumer from underlying Kafka consumer
1282    fn new(
1283        pid: PartitionId,
1284        partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1285    ) -> Self {
1286        PartitionConsumer {
1287            pid,
1288            partition_queue,
1289        }
1290    }
1291
1292    /// Returns the next message to process for this partition (if any).
1293    ///
1294    /// The outer `Result` represents irrecoverable failures, the inner one can and will
1295    /// be transformed into empty values.
1296    ///
1297    /// The inner `Option` represents if there is a message to process.
1298    fn get_next_message(&self) -> Result<Option<(BorrowedMessage, PartitionId)>, KafkaError> {
1299        match self.partition_queue.poll(Duration::from_millis(0)) {
1300            Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1301            Some(Err(err)) => Err(err),
1302            _ => Ok(None),
1303        }
1304    }
1305
1306    /// Return the partition id for this PartitionConsumer
1307    fn pid(&self) -> PartitionId {
1308        self.pid
1309    }
1310}
1311
1312/// An implementation of [`ConsumerContext`] that forwards statistics to the
1313/// worker
1314struct GlueConsumerContext {
1315    notificator: Arc<Notify>,
1316    stats_tx: crossbeam_channel::Sender<Jsonb>,
1317    inner: MzClientContext,
1318}
1319
1320impl ClientContext for GlueConsumerContext {
1321    fn stats_raw(&self, statistics: &[u8]) {
1322        match Jsonb::from_slice(statistics) {
1323            Ok(statistics) => {
1324                self.stats_tx
1325                    .send(statistics)
1326                    .expect("timely operator hung up while Kafka source active");
1327                self.activate();
1328            }
1329            Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1330        };
1331    }
1332
1333    // The shape of the rdkafka *Context traits require us to forward to the `MzClientContext`
1334    // implementation.
1335    fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1336        self.inner.log(level, fac, log_message)
1337    }
1338    fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1339        self.inner.error(error, reason)
1340    }
1341}
1342
1343impl GlueConsumerContext {
1344    fn activate(&self) {
1345        self.notificator.notify_one();
1346    }
1347}
1348
1349impl ConsumerContext for GlueConsumerContext {}
1350
1351#[cfg(test)]
1352mod tests {
1353    use std::sync::Arc;
1354    use std::time::Duration;
1355
1356    use mz_kafka_util::client::create_new_client_config_simple;
1357    use rdkafka::consumer::{BaseConsumer, Consumer};
1358    use rdkafka::{Message, Offset, TopicPartitionList};
1359    use uuid::Uuid;
1360
1361    // Splitting off a partition queue with an `Offset` that is not `Offset::Beginning` seems to
1362    // lead to a race condition where sometimes we receive messages from polling the main consumer
1363    // instead of on the partition queue. This can be surfaced by running the test in a loop (in
1364    // the dataflow directory) using:
1365    //
1366    // cargo stress --lib --release source::kafka::tests::reproduce_kafka_queue_issue
1367    //
1368    // cargo-stress can be installed via `cargo install cargo-stress`
1369    //
1370    // You need to set up a topic "queue-test" with 1000 "hello" messages in it. Obviously, running
1371    // this test requires a running Kafka instance at localhost:9092.
1372    #[mz_ore::test]
1373    #[ignore]
1374    fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1375        let topic_name = "queue-test";
1376        let pid = 0;
1377
1378        let mut kafka_config = create_new_client_config_simple();
1379        kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1380        kafka_config.set("enable.auto.commit", "false");
1381        kafka_config.set("group.id", Uuid::new_v4().to_string());
1382        kafka_config.set("fetch.message.max.bytes", "100");
1383        let consumer: BaseConsumer<_> = kafka_config.create()?;
1384
1385        let consumer = Arc::new(consumer);
1386
1387        let mut partition_list = TopicPartitionList::new();
1388        // Using Offset:Beginning here will work fine, only Offset:Offset(0) leads to the race
1389        // condition.
1390        partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1391
1392        consumer.assign(&partition_list)?;
1393
1394        let partition_queue = consumer
1395            .split_partition_queue(topic_name, pid)
1396            .expect("missing partition queue");
1397
1398        let expected_messages = 1_000;
1399
1400        let mut common_queue_count = 0;
1401        let mut partition_queue_count = 0;
1402
1403        loop {
1404            if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1405                match msg {
1406                    Ok(msg) => {
1407                        let _payload =
1408                            std::str::from_utf8(msg.payload().expect("missing payload"))?;
1409                        if partition_queue_count > 0 {
1410                            anyhow::bail!(
1411                                "Got message from common queue after we internally switched to partition queue."
1412                            );
1413                        }
1414
1415                        common_queue_count += 1;
1416                    }
1417                    Err(err) => anyhow::bail!("{}", err),
1418                }
1419            }
1420
1421            match partition_queue.poll(Duration::from_millis(0)) {
1422                Some(Ok(msg)) => {
1423                    let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1424                    partition_queue_count += 1;
1425                }
1426                Some(Err(err)) => anyhow::bail!("{}", err),
1427                _ => (),
1428            }
1429
1430            if (common_queue_count + partition_queue_count) == expected_messages {
1431                break;
1432            }
1433        }
1434
1435        assert!(
1436            common_queue_count == 0,
1437            "Got {} out of {} messages from common queue. Partition queue: {}",
1438            common_queue_count,
1439            expected_messages,
1440            partition_queue_count
1441        );
1442
1443        Ok(())
1444    }
1445}
1446
1447/// Fetches the list of partitions and their corresponding high watermark.
1448fn fetch_partition_info<C: ConsumerContext>(
1449    consumer: &BaseConsumer<C>,
1450    topic: &str,
1451    fetch_timeout: Duration,
1452) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1453    let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1454
1455    let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1456    for pid in pids {
1457        offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1458    }
1459
1460    let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1461
1462    let mut result = BTreeMap::new();
1463    for entry in offset_responses.elements() {
1464        let offset = match entry.offset() {
1465            Offset::Offset(offset) => offset,
1466            offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1467        };
1468
1469        let pid = entry.partition();
1470        let watermark = offset.try_into().expect("invalid negative offset");
1471        result.insert(pid, watermark);
1472    }
1473
1474    Ok(result)
1475}
1476
1477/// An update produced by the metadata fetcher.
1478#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1479enum MetadataUpdate {
1480    /// The current IDs and high watermarks of all topic partitions.
1481    Partitions(BTreeMap<PartitionId, HighWatermark>),
1482    /// A transient error.
1483    ///
1484    /// Transient errors stall the source until their cause has been resolved.
1485    TransientError(HealthStatus),
1486    /// A definite error.
1487    ///
1488    /// Definite errors cannot be recovered from. They poison the source until the end of time.
1489    DefiniteError(SourceError),
1490}
1491
1492impl MetadataUpdate {
1493    /// Return the upstream frontier resulting from the metadata update, if any.
1494    fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1495        match self {
1496            Self::Partitions(partitions) => {
1497                let max_pid = partitions.keys().last().copied();
1498                let lower = max_pid
1499                    .map(RangeBound::after)
1500                    .unwrap_or(RangeBound::NegInfinity);
1501                let future_ts =
1502                    Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1503
1504                let mut frontier = Antichain::from_elem(future_ts);
1505                for (pid, high_watermark) in partitions {
1506                    frontier.insert(Partitioned::new_singleton(
1507                        RangeBound::exact(*pid),
1508                        MzOffset::from(*high_watermark),
1509                    ));
1510                }
1511
1512                Some(frontier)
1513            }
1514            Self::DefiniteError(_) => Some(Antichain::new()),
1515            Self::TransientError(_) => None,
1516        }
1517    }
1518}
1519
1520#[derive(Debug, thiserror::Error)]
1521pub enum KafkaHeaderParseError {
1522    #[error("A header with key '{key}' was not found in the message headers")]
1523    KeyNotFound { key: String },
1524    #[error(
1525        "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1526    )]
1527    Utf8Error { key: String, raw: Vec<u8> },
1528}
1529
1530/// Render the metadata fetcher of a Kafka source.
1531///
1532/// The metadata fetcher is a single-worker operator that is responsible for periodically fetching
1533/// the Kafka topic metadata (partition IDs and high watermarks) and making it available as a
1534/// Timely stream.
1535fn render_metadata_fetcher<G: Scope<Timestamp = KafkaTimestamp>>(
1536    scope: &G,
1537    connection: KafkaSourceConnection,
1538    config: RawSourceCreationConfig,
1539) -> (
1540    Stream<G, (mz_repr::Timestamp, MetadataUpdate)>,
1541    Stream<G, Probe<KafkaTimestamp>>,
1542    PressOnDropButton,
1543) {
1544    let active_worker_id = usize::cast_from(config.id.hashed());
1545    let is_active_worker = active_worker_id % scope.peers() == scope.index();
1546
1547    let resume_upper = Antichain::from_iter(
1548        config
1549            .source_resume_uppers
1550            .values()
1551            .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1552            .flatten(),
1553    );
1554
1555    let name = format!("KafkaMetadataFetcher({})", config.id);
1556    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1557
1558    let (metadata_output, metadata_stream) = builder.new_output();
1559    let (probe_output, probe_stream) = builder.new_output();
1560
1561    let button = builder.build(move |caps| async move {
1562        if !is_active_worker {
1563            return;
1564        }
1565
1566        let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1567
1568        let client_id = connection.client_id(
1569            config.config.config_set(),
1570            &config.config.connection_context,
1571            config.id,
1572        );
1573        let KafkaSourceConnection {
1574            connection,
1575            topic,
1576            topic_metadata_refresh_interval,
1577            ..
1578        } = connection;
1579
1580        let consumer: Result<BaseConsumer<_>, _> = connection
1581            .create_with_context(
1582                &config.config,
1583                MzClientContext::default(),
1584                &btreemap! {
1585                    // Use the user-configured topic metadata refresh
1586                    // interval.
1587                    "topic.metadata.refresh.interval.ms" =>
1588                        topic_metadata_refresh_interval
1589                        .as_millis()
1590                        .to_string(),
1591                    // Allow Kafka monitoring tools to identify this
1592                    // consumer.
1593                    "client.id" => format!("{client_id}-metadata"),
1594                },
1595                InTask::Yes,
1596            )
1597            .await;
1598
1599        let consumer = match consumer {
1600            Ok(consumer) => consumer,
1601            Err(e) => {
1602                let msg = format!(
1603                    "failed creating kafka metadata consumer: {}",
1604                    e.display_with_causes()
1605                );
1606                let status_update = HealthStatusUpdate::halting(msg, None);
1607                let status = match e {
1608                    ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1609                    _ => HealthStatus::kafka(status_update),
1610                };
1611                let error = MetadataUpdate::TransientError(status);
1612                let timestamp = (config.now_fn)().into();
1613                metadata_output.give(&metadata_cap, (timestamp, error));
1614
1615                // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
1616                // Returning would incorrectly present to the remap operator as progress to the
1617                // empty frontier which would be incorrectly recorded to the remap shard.
1618                std::future::pending::<()>().await;
1619                unreachable!("pending future never returns");
1620            }
1621        };
1622
1623        let (tx, mut rx) = mpsc::unbounded_channel();
1624        spawn_metadata_thread(config, consumer, topic, tx);
1625
1626        let mut prev_upstream_frontier = resume_upper;
1627
1628        while let Some((timestamp, mut update)) = rx.recv().await {
1629            if prev_upstream_frontier.is_empty() {
1630                return;
1631            }
1632
1633            if let Some(upstream_frontier) = update.upstream_frontier() {
1634                // Topics are identified by name but it's possible that a user recreates a topic
1635                // with the same name. Ideally we'd want to catch all of these cases and
1636                // immediately error out the source, since the data is effectively gone.
1637                // Unfortunately this is not possible without something like KIP-516.
1638                //
1639                // The best we can do is check whether the upstream frontier regressed. This tells
1640                // us that the topic was recreated and now contains fewer offsets and/or fewer
1641                // partitions. Note that we are not able to detect topic recreation if neither of
1642                // the two are true.
1643                if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1644                    let error = SourceError {
1645                        error: SourceErrorDetails::Other("topic was recreated".into()),
1646                    };
1647                    update = MetadataUpdate::DefiniteError(error);
1648                }
1649            }
1650
1651            if let Some(upstream_frontier) = update.upstream_frontier() {
1652                prev_upstream_frontier = upstream_frontier.clone();
1653
1654                let probe = Probe {
1655                    probe_ts: timestamp,
1656                    upstream_frontier,
1657                };
1658                probe_output.give(&probe_cap, probe);
1659            }
1660
1661            metadata_output.give(&metadata_cap, (timestamp, update));
1662        }
1663    });
1664
1665    (metadata_stream, probe_stream, button.press_on_drop())
1666}
1667
1668fn spawn_metadata_thread<C: ConsumerContext>(
1669    config: RawSourceCreationConfig,
1670    consumer: BaseConsumer<TunnelingClientContext<C>>,
1671    topic: String,
1672    tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1673) {
1674    // Linux thread names are limited to 15 characters. Use a truncated ID to fit the name.
1675    thread::Builder::new()
1676        .name(format!("kfk-mtdt-{}", config.id))
1677        .spawn(move || {
1678            trace!(
1679                source_id = config.id.to_string(),
1680                worker_id = config.worker_id,
1681                num_workers = config.worker_count,
1682                "kafka metadata thread: starting..."
1683            );
1684
1685            let mut ticker = probe::Ticker::new(
1686                || KAFKA_METADATA_FETCH_INTERVAL.get(config.config.config_set()),
1687                config.now_fn,
1688            );
1689
1690            loop {
1691                let probe_ts = ticker.tick_blocking();
1692                let result = fetch_partition_info(
1693                    &consumer,
1694                    &topic,
1695                    config
1696                        .config
1697                        .parameters
1698                        .kafka_timeout_config
1699                        .fetch_metadata_timeout,
1700                );
1701                trace!(
1702                    source_id = config.id.to_string(),
1703                    worker_id = config.worker_id,
1704                    num_workers = config.worker_count,
1705                    "kafka metadata thread: metadata fetch result: {:?}",
1706                    result
1707                );
1708                let update = match result {
1709                    Ok(partitions) => {
1710                        trace!(
1711                            source_id = config.id.to_string(),
1712                            worker_id = config.worker_id,
1713                            num_workers = config.worker_count,
1714                            "kafka metadata thread: fetched partition metadata info",
1715                        );
1716
1717                        MetadataUpdate::Partitions(partitions)
1718                    }
1719                    Err(GetPartitionsError::TopicDoesNotExist) => {
1720                        let error = SourceError {
1721                            error: SourceErrorDetails::Other("topic was deleted".into()),
1722                        };
1723                        MetadataUpdate::DefiniteError(error)
1724                    }
1725                    Err(e) => {
1726                        let kafka_status = Some(HealthStatusUpdate::stalled(
1727                            format!("{}", e.display_with_causes()),
1728                            None,
1729                        ));
1730
1731                        let ssh_status = consumer.client().context().tunnel_status();
1732                        let ssh_status = match ssh_status {
1733                            SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1734                            SshTunnelStatus::Errored(e) => {
1735                                Some(HealthStatusUpdate::stalled(e, None))
1736                            }
1737                        };
1738
1739                        MetadataUpdate::TransientError(HealthStatus {
1740                            kafka: kafka_status,
1741                            ssh: ssh_status,
1742                        })
1743                    }
1744                };
1745
1746                if tx.send((probe_ts, update)).is_err() {
1747                    break;
1748                }
1749            }
1750
1751            info!(
1752                source_id = config.id.to_string(),
1753                worker_id = config.worker_id,
1754                num_workers = config.worker_count,
1755                "kafka metadata thread: receiver has gone away; shutting down."
1756            )
1757        })
1758        .unwrap();
1759}