Skip to main content

mz_storage/source/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11use std::collections::btree_map::Entry;
12use std::str::{self};
13use std::sync::Arc;
14use std::thread;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use chrono::{DateTime, NaiveDateTime};
19use differential_dataflow::{AsCollection, Hashable};
20use futures::StreamExt;
21use itertools::Itertools;
22use maplit::btreemap;
23use mz_kafka_util::client::{
24    GetPartitionsError, MzClientContext, PartitionId, TunnelingClientContext, get_partitions,
25};
26use mz_ore::assert_none;
27use mz_ore::cast::CastFrom;
28use mz_ore::error::ErrorExt;
29use mz_ore::future::InTask;
30use mz_ore::iter::IteratorExt;
31use mz_repr::adt::timestamp::CheckedTimestamp;
32use mz_repr::{Datum, Diff, GlobalId, Row, adt::jsonb::Jsonb};
33use mz_ssh_util::tunnel::SshTunnelStatus;
34use mz_storage_types::errors::{
35    ContextCreationError, DataflowError, SourceError, SourceErrorDetails,
36};
37use mz_storage_types::sources::kafka::{
38    KafkaMetadataKind, KafkaSourceConnection, KafkaTimestamp, RangeBound,
39};
40use mz_storage_types::sources::{MzOffset, SourceExport, SourceExportDetails, SourceTimestamp};
41use mz_timely_util::antichain::AntichainExt;
42use mz_timely_util::builder_async::{
43    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
44};
45use mz_timely_util::containers::stack::FueledBuilder;
46use mz_timely_util::order::Partitioned;
47use rdkafka::consumer::base_consumer::PartitionQueue;
48use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
49use rdkafka::error::KafkaError;
50use rdkafka::message::{BorrowedMessage, Headers};
51use rdkafka::statistics::Statistics;
52use rdkafka::topic_partition_list::Offset;
53use rdkafka::{ClientContext, Message, TopicPartitionList};
54use serde::{Deserialize, Serialize};
55use timely::PartialOrder;
56use timely::container::CapacityContainerBuilder;
57use timely::dataflow::channels::pact::Pipeline;
58use timely::dataflow::operators::Capability;
59use timely::dataflow::operators::core::Partition;
60use timely::dataflow::operators::vec::Broadcast;
61use timely::dataflow::{Scope, StreamVec};
62use timely::progress::Antichain;
63use timely::progress::Timestamp;
64use tokio::sync::{Notify, mpsc};
65use tracing::{error, info, trace};
66
67use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
68use crate::metrics::source::kafka::KafkaSourceMetrics;
69use crate::source::types::{FuelSize, Probe, SignaledFuture, SourceRender, StackedCollection};
70use crate::source::{RawSourceCreationConfig, SourceMessage, probe};
71use crate::statistics::SourceStatistics;
72
73#[derive(
74    Clone,
75    Debug,
76    Default,
77    PartialEq,
78    Eq,
79    PartialOrd,
80    Ord,
81    Serialize,
82    Deserialize
83)]
84struct HealthStatus {
85    kafka: Option<HealthStatusUpdate>,
86    ssh: Option<HealthStatusUpdate>,
87}
88
89impl HealthStatus {
90    fn kafka(update: HealthStatusUpdate) -> Self {
91        Self {
92            kafka: Some(update),
93            ssh: None,
94        }
95    }
96
97    fn ssh(update: HealthStatusUpdate) -> Self {
98        Self {
99            kafka: None,
100            ssh: Some(update),
101        }
102    }
103}
104
105/// Contains all information necessary to ingest data from Kafka
106pub struct KafkaSourceReader {
107    /// Name of the topic on which this source is backed on
108    topic_name: String,
109    /// Name of the source (will have format kafka-source-id)
110    source_name: String,
111    /// Source global ID
112    id: GlobalId,
113    /// Kafka consumer for this source
114    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
115    /// List of consumers. A consumer should be assigned per partition to guarantee fairness
116    partition_consumers: Vec<PartitionConsumer>,
117    /// Worker ID
118    worker_id: usize,
119    /// Total count of workers
120    worker_count: usize,
121    /// The most recently read offset for each partition known to this source
122    /// reader by output-index. An offset of -1 indicates that no prior message
123    /// has been read for the given partition.
124    last_offsets: BTreeMap<usize, BTreeMap<PartitionId, i64>>,
125    /// The offset to start reading from for each partition.
126    start_offsets: BTreeMap<PartitionId, i64>,
127    /// Channel to receive Kafka statistics JSON blobs from the stats callback.
128    stats_rx: crossbeam_channel::Receiver<Jsonb>,
129    /// A handle to the partition specific metrics
130    partition_metrics: KafkaSourceMetrics,
131    /// Per partition capabilities used to produce messages
132    partition_capabilities: BTreeMap<PartitionId, PartitionCapability>,
133}
134
135struct PartitionCapability {
136    /// The capability of the data produced
137    data: Capability<KafkaTimestamp>,
138}
139
140/// The high watermark offsets of a Kafka partition.
141///
142/// This is the offset of the latest message in the topic/partition available for consumption + 1.
143type HighWatermark = u64;
144
145/// Processes `resume_uppers` stream updates, committing them upstream and
146/// storing them in the `progress_statistics` to be emitted later.
147pub struct KafkaResumeUpperProcessor {
148    config: RawSourceCreationConfig,
149    topic_name: String,
150    consumer: Arc<BaseConsumer<TunnelingClientContext<GlueConsumerContext>>>,
151    statistics: Vec<SourceStatistics>,
152}
153
154/// Computes whether this worker is responsible for consuming a partition. It assigns partitions to
155/// workers in a round-robin fashion, starting at an arbitrary worker based on the hash of the
156/// source id.
157fn responsible_for_pid(config: &RawSourceCreationConfig, pid: i32) -> bool {
158    let pid = usize::try_from(pid).expect("positive pid");
159    ((config.responsible_worker(config.id) + pid) % config.worker_count) == config.worker_id
160}
161
162struct SourceOutputInfo {
163    id: GlobalId,
164    output_index: usize,
165    resume_upper: Antichain<KafkaTimestamp>,
166    metadata_columns: Vec<KafkaMetadataKind>,
167}
168
169impl SourceRender for KafkaSourceConnection {
170    // TODO(petrosagg): The type used for the partition (RangeBound<PartitionId>) doesn't need to
171    // be so complicated and we could instead use `Partitioned<PartitionId, Option<u64>>` where all
172    // ranges are inclusive and a time of `None` signifies that a particular partition is not
173    // present. This requires an shard migration of the remap shard.
174    type Time = KafkaTimestamp;
175
176    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Kafka;
177
178    fn render<'scope>(
179        self,
180        scope: Scope<'scope, KafkaTimestamp>,
181        config: &RawSourceCreationConfig,
182        resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
183        start_signal: impl std::future::Future<Output = ()> + 'static,
184    ) -> (
185        BTreeMap<
186            GlobalId,
187            StackedCollection<'scope, KafkaTimestamp, Result<SourceMessage, DataflowError>>,
188        >,
189        StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
190        StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
191        Vec<PressOnDropButton>,
192    ) {
193        let (metadata, probes, metadata_token) =
194            render_metadata_fetcher(scope, self.clone(), config.clone());
195        let (data, health, reader_token) = render_reader(
196            scope,
197            self,
198            config.clone(),
199            resume_uppers,
200            metadata,
201            start_signal,
202        );
203
204        let partition_count = u64::cast_from(config.source_exports.len());
205        let data_streams: Vec<_> = data.inner.partition::<CapacityContainerBuilder<_>, _, _>(
206            partition_count,
207            |((output, data), time, diff)| {
208                let output = u64::cast_from(output);
209                (output, (data, time, diff))
210            },
211        );
212        let mut data_collections = BTreeMap::new();
213        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
214            data_collections.insert(*id, data_stream.as_collection());
215        }
216
217        (
218            data_collections,
219            health,
220            probes,
221            vec![metadata_token, reader_token],
222        )
223    }
224}
225
226/// Render the reader of a Kafka source.
227///
228/// The reader is responsible for polling the Kafka topic partitions for new messages, and
229/// transforming them into a `SourceMessage` collection.
230fn render_reader<'scope>(
231    scope: Scope<'scope, KafkaTimestamp>,
232    connection: KafkaSourceConnection,
233    config: RawSourceCreationConfig,
234    resume_uppers: impl futures::Stream<Item = Antichain<KafkaTimestamp>> + 'static,
235    metadata_stream: StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
236    start_signal: impl std::future::Future<Output = ()> + 'static,
237) -> (
238    StackedCollection<'scope, KafkaTimestamp, (usize, Result<SourceMessage, DataflowError>)>,
239    StreamVec<'scope, KafkaTimestamp, HealthStatusMessage>,
240    PressOnDropButton,
241) {
242    let name = format!("KafkaReader({})", config.id);
243    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
244
245    let (data_output, stream) = builder.new_output::<FueledBuilder<_>>();
246    let (health_output, health_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
247
248    let mut metadata_input = builder.new_disconnected_input(metadata_stream.broadcast(), Pipeline);
249
250    let mut outputs = vec![];
251
252    // Contains the `SourceStatistics` entries for exports that require a snapshot.
253    let mut all_export_stats = vec![];
254    let mut snapshot_export_stats = vec![];
255    for (idx, (id, export)) in config.source_exports.iter().enumerate() {
256        let SourceExport {
257            details,
258            storage_metadata: _,
259            data_config: _,
260        } = export;
261        let resume_upper = Antichain::from_iter(
262            config
263                .source_resume_uppers
264                .get(id)
265                .expect("all source exports must be present in source resume uppers")
266                .iter()
267                .map(Partitioned::<RangeBound<PartitionId>, MzOffset>::decode_row),
268        );
269
270        let metadata_columns = match details {
271            SourceExportDetails::Kafka(details) => details
272                .metadata_columns
273                .iter()
274                .map(|(_name, kind)| kind.clone())
275                .collect::<Vec<_>>(),
276            _ => panic!("unexpected source export details: {:?}", details),
277        };
278
279        let statistics = config
280            .statistics
281            .get(id)
282            .expect("statistics have been initialized")
283            .clone();
284        // export requires snapshot
285        if resume_upper.as_ref() == &[Partitioned::minimum()] {
286            snapshot_export_stats.push(statistics.clone());
287        }
288        all_export_stats.push(statistics);
289
290        let output = SourceOutputInfo {
291            id: *id,
292            resume_upper,
293            output_index: idx,
294            metadata_columns,
295        };
296        outputs.push(output);
297    }
298
299    let busy_signal = Arc::clone(&config.busy_signal);
300    let button = builder.build(move |caps| {
301        SignaledFuture::new(busy_signal, async move {
302            let [mut data_cap, health_cap] = caps.try_into().unwrap();
303
304            let client_id = connection.client_id(
305                config.config.config_set(),
306                &config.config.connection_context,
307                config.id,
308            );
309            let group_id = connection.group_id(&config.config.connection_context, config.id);
310            let KafkaSourceConnection {
311                connection,
312                topic,
313                topic_metadata_refresh_interval,
314                start_offsets,
315                metadata_columns: _,
316                // Exhaustive match protects against forgetting to apply an
317                // option. Ignored fields are justified below.
318                connection_id: _,   // not needed here
319                group_id_prefix: _, // used above via `connection.group_id`
320            } = connection;
321
322            // Start offsets is a map from partition to the next offset to read from.
323            let mut start_offsets: BTreeMap<_, i64> = start_offsets
324                .clone()
325                .into_iter()
326                .filter(|(pid, _offset)| responsible_for_pid(&config, *pid))
327                .map(|(k, v)| (k, v))
328                .collect();
329
330            let mut partition_capabilities = BTreeMap::new();
331            let mut max_pid = None;
332            let resume_upper = Antichain::from_iter(
333                outputs
334                    .iter()
335                    .map(|output| output.resume_upper.clone())
336                    .flatten(),
337            );
338
339            for ts in resume_upper.elements() {
340                if let Some(pid) = ts.interval().singleton() {
341                    let pid = pid.unwrap_exact();
342                    max_pid = std::cmp::max(max_pid, Some(*pid));
343                    if responsible_for_pid(&config, *pid) {
344                        let restored_offset = i64::try_from(ts.timestamp().offset)
345                            .expect("restored kafka offsets must fit into i64");
346                        if let Some(start_offset) = start_offsets.get_mut(pid) {
347                            *start_offset = std::cmp::max(restored_offset, *start_offset);
348                        } else {
349                            start_offsets.insert(*pid, restored_offset);
350                        }
351
352                        let part_ts = Partitioned::new_singleton(
353                            RangeBound::exact(*pid),
354                            ts.timestamp().clone(),
355                        );
356                        let part_cap = PartitionCapability {
357                            data: data_cap.delayed(&part_ts),
358                        };
359                        partition_capabilities.insert(*pid, part_cap);
360                    }
361                }
362            }
363            let lower = max_pid
364                .map(RangeBound::after)
365                .unwrap_or(RangeBound::NegInfinity);
366            let future_ts =
367                Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
368            data_cap.downgrade(&future_ts);
369
370            info!(
371                source_id = config.id.to_string(),
372                worker_id = config.worker_id,
373                num_workers = config.worker_count,
374                "instantiating Kafka source reader at offsets {start_offsets:?}"
375            );
376
377            let (stats_tx, stats_rx) = crossbeam_channel::unbounded();
378            let notificator = Arc::new(Notify::new());
379
380            let consumer: Result<BaseConsumer<_>, _> = connection
381                .create_with_context(
382                    &config.config,
383                    GlueConsumerContext {
384                        notificator: Arc::clone(&notificator),
385                        stats_tx,
386                        inner: MzClientContext::default(),
387                    },
388                    &btreemap! {
389                        // Disable Kafka auto commit. We manually commit offsets
390                        // to Kafka once we have reclocked those offsets, so
391                        // that users can use standard Kafka tools for progress
392                        // tracking.
393                        "enable.auto.commit" => "false".into(),
394                        // Always begin ingest at 0 when restarted, even if Kafka
395                        // contains committed consumer read offsets
396                        "auto.offset.reset" => "earliest".into(),
397                        // Use the user-configured topic metadata refresh
398                        // interval.
399                        "topic.metadata.refresh.interval.ms" =>
400                            topic_metadata_refresh_interval
401                            .as_millis()
402                            .to_string(),
403                        // TODO: document the rationale for this.
404                        "fetch.message.max.bytes" => "134217728".into(),
405                        // Consumer group ID, which may have been overridden by
406                        // the user. librdkafka requires this, and we use offset
407                        // committing to provide a way for users to monitor
408                        // ingest progress, though we do not rely on the
409                        // committed offsets for any functionality.
410                        "group.id" => group_id.clone(),
411                        // Allow Kafka monitoring tools to identify this
412                        // consumer.
413                        "client.id" => client_id.clone(),
414                    },
415                    InTask::Yes,
416                )
417                .await;
418
419            let consumer = match consumer {
420                Ok(consumer) => Arc::new(consumer),
421                Err(e) => {
422                    let update = HealthStatusUpdate::halting(
423                        format!(
424                            "failed creating kafka reader consumer: {}",
425                            e.display_with_causes()
426                        ),
427                        None,
428                    );
429                    health_output.give(
430                        &health_cap,
431                        HealthStatusMessage {
432                            id: None,
433                            namespace: if matches!(e, ContextCreationError::Ssh(_)) {
434                                StatusNamespace::Ssh
435                            } else {
436                                StatusNamespace::Kafka
437                            },
438                            update: update.clone(),
439                        },
440                    );
441                    for (output, update) in outputs.iter().repeat_clone(update) {
442                        health_output.give(
443                            &health_cap,
444                            HealthStatusMessage {
445                                id: Some(output.id),
446                                namespace: if matches!(e, ContextCreationError::Ssh(_)) {
447                                    StatusNamespace::Ssh
448                                } else {
449                                    StatusNamespace::Kafka
450                                },
451                                update,
452                            },
453                        );
454                    }
455                    // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
456                    // Returning would incorrectly present to the remap operator as progress to the
457                    // empty frontier which would be incorrectly recorded to the remap shard.
458                    std::future::pending::<()>().await;
459                    unreachable!("pending future never returns");
460                }
461            };
462
463            // Note that we wait for this AFTER we downgrade to the source `resume_upper`. This
464            // allows downstream operators (namely, the `reclock_operator`) to downgrade to the
465            // `resume_upper`, which is necessary for this basic form of backpressure to work.
466            start_signal.await;
467            info!(
468                source_id = config.id.to_string(),
469                worker_id = config.worker_id,
470                num_workers = config.worker_count,
471                "kafka worker noticed rehydration is finished, starting partition queues..."
472            );
473
474            let partition_ids = start_offsets.keys().copied().collect();
475            let offset_commit_metrics = config.metrics.get_offset_commit_metrics(config.id);
476
477            let mut reader = KafkaSourceReader {
478                topic_name: topic.clone(),
479                source_name: config.name.clone(),
480                id: config.id,
481                partition_consumers: Vec::new(),
482                consumer: Arc::clone(&consumer),
483                worker_id: config.worker_id,
484                worker_count: config.worker_count,
485                last_offsets: outputs
486                    .iter()
487                    .map(|output| (output.output_index, BTreeMap::new()))
488                    .collect(),
489                start_offsets,
490                stats_rx,
491                partition_metrics: config.metrics.get_kafka_source_metrics(
492                    partition_ids,
493                    topic.clone(),
494                    config.id,
495                ),
496                partition_capabilities,
497            };
498
499            let offset_committer = KafkaResumeUpperProcessor {
500                config: config.clone(),
501                topic_name: topic.clone(),
502                consumer,
503                statistics: all_export_stats.clone(),
504            };
505
506            // Seed the progress metrics with `0` if we are snapshotting.
507            if !snapshot_export_stats.is_empty() {
508                if let Err(e) = offset_committer
509                    .process_frontier(resume_upper.clone())
510                    .await
511                {
512                    offset_commit_metrics.offset_commit_failures.inc();
513                    tracing::warn!(
514                        %e,
515                        "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
516                        worker_id = config.worker_id,
517                        source_id = config.id,
518                        upper = resume_upper.pretty()
519                    );
520                }
521                // Reset snapshot statistics for any exports that are not involved
522                // in this round of snapshotting. Those that are snapshotting this round will
523                // see updates as the snapshot commences.
524                for statistics in config.statistics.values() {
525                    statistics.set_snapshot_records_known(0);
526                    statistics.set_snapshot_records_staged(0);
527                }
528            }
529
530            let resume_uppers_process_loop = async move {
531                tokio::pin!(resume_uppers);
532                while let Some(frontier) = resume_uppers.next().await {
533                    if let Err(e) = offset_committer.process_frontier(frontier.clone()).await {
534                        offset_commit_metrics.offset_commit_failures.inc();
535                        tracing::warn!(
536                            %e,
537                            "timely-{worker_id} source({source_id}) failed to commit offsets: resume_upper={upper}",
538                            worker_id = config.worker_id,
539                            source_id = config.id,
540                            upper = frontier.pretty()
541                        );
542                    }
543                }
544                // During dataflow shutdown this loop can end due to the general chaos caused by
545                // dropping tokens as a means to shutdown. This call ensures this future never ends
546                // and we instead rely on this operator being dropped altogether when *its* token
547                // is dropped.
548                std::future::pending::<()>().await;
549            };
550            tokio::pin!(resume_uppers_process_loop);
551
552            let mut metadata_update: Option<MetadataUpdate> = None;
553            let mut snapshot_total = None;
554
555            let max_wait_time =
556                mz_storage_types::dyncfgs::KAFKA_POLL_MAX_WAIT.get(config.config.config_set());
557            loop {
558                // Wait for data or metadata events while also making progress with offset
559                // committing.
560                tokio::select! {
561                    // TODO(petrosagg): remove the timeout and rely purely on librdkafka waking us
562                    // up
563                    _ = tokio::time::timeout(max_wait_time, notificator.notified()) => {},
564
565                    _ = metadata_input.ready() => {
566                        // Collect all pending updates, then only keep the most recent one.
567                        let mut updates = Vec::new();
568                        while let Some(event) = metadata_input.next_sync() {
569                            if let Event::Data(_, mut data) = event {
570                                updates.append(&mut data);
571                            }
572                        }
573                        metadata_update = updates
574                            .into_iter()
575                            .max_by_key(|(ts, _)| *ts)
576                            .map(|(_, update)| update);
577                    }
578
579                    // This future is not cancel safe but we are only passing a reference to it in
580                    // the select! loop so the future stays on the stack and never gets cancelled
581                    // until the end of the function.
582                    _ = resume_uppers_process_loop.as_mut() => {},
583                }
584
585                match metadata_update.take() {
586                    Some(MetadataUpdate::Partitions(partitions)) => {
587                        let max_pid = partitions.keys().last().cloned();
588                        let lower = max_pid
589                            .map(RangeBound::after)
590                            .unwrap_or(RangeBound::NegInfinity);
591                        let future_ts = Partitioned::new_range(
592                            lower,
593                            RangeBound::PosInfinity,
594                            MzOffset::from(0),
595                        );
596
597                        let mut offset_known = 0;
598                        for (&pid, &high_watermark) in &partitions {
599                            if responsible_for_pid(&config, pid) {
600                                offset_known += high_watermark;
601                                reader.ensure_partition(pid);
602                                if let Entry::Vacant(entry) =
603                                    reader.partition_capabilities.entry(pid)
604                                {
605                                    let start_offset = match reader.start_offsets.get(&pid) {
606                                        Some(&offset) => offset.try_into().unwrap(),
607                                        None => 0u64,
608                                    };
609                                    let part_since_ts = Partitioned::new_singleton(
610                                        RangeBound::exact(pid),
611                                        MzOffset::from(start_offset),
612                                    );
613
614                                    entry.insert(PartitionCapability {
615                                        data: data_cap.delayed(&part_since_ts),
616                                    });
617                                }
618                            }
619                        }
620
621                        // If we are snapshotting, record our first set of partitions as the snapshot
622                        // size.
623                        if !snapshot_export_stats.is_empty() && snapshot_total.is_none() {
624                            // Note that we want to represent the _number of offsets_, which
625                            // means the watermark's frontier semantics is correct, without
626                            // subtracting (Kafka offsets start at 0).
627                            snapshot_total = Some(offset_known);
628                        }
629
630                        // Clear all the health namespaces we know about.
631                        // Note that many kafka sources's don't have an ssh tunnel, but the
632                        // `health_operator` handles this fine.
633                        for output in &outputs {
634                            for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
635                                health_output.give(
636                                    &health_cap,
637                                    HealthStatusMessage {
638                                        id: Some(output.id),
639                                        namespace,
640                                        update: HealthStatusUpdate::running(),
641                                    },
642                                );
643                            }
644                        }
645                        for namespace in [StatusNamespace::Kafka, StatusNamespace::Ssh] {
646                            health_output.give(
647                                &health_cap,
648                                HealthStatusMessage {
649                                    id: None,
650                                    namespace,
651                                    update: HealthStatusUpdate::running(),
652                                },
653                            );
654                        }
655
656                        for export_stat in all_export_stats.iter() {
657                            export_stat.set_offset_known(offset_known);
658                        }
659
660                        data_cap.downgrade(&future_ts);
661                    }
662                    Some(MetadataUpdate::TransientError(status)) => {
663                        if let Some(update) = status.kafka {
664                            health_output.give(
665                                &health_cap,
666                                HealthStatusMessage {
667                                    id: None,
668                                    namespace: StatusNamespace::Kafka,
669                                    update: update.clone(),
670                                },
671                            );
672                            for (output, update) in outputs.iter().repeat_clone(update) {
673                                health_output.give(
674                                    &health_cap,
675                                    HealthStatusMessage {
676                                        id: Some(output.id),
677                                        namespace: StatusNamespace::Kafka,
678                                        update,
679                                    },
680                                );
681                            }
682                        }
683                        if let Some(update) = status.ssh {
684                            health_output.give(
685                                &health_cap,
686                                HealthStatusMessage {
687                                    id: None,
688                                    namespace: StatusNamespace::Ssh,
689                                    update: update.clone(),
690                                },
691                            );
692                            for (output, update) in outputs.iter().repeat_clone(update) {
693                                health_output.give(
694                                    &health_cap,
695                                    HealthStatusMessage {
696                                        id: Some(output.id),
697                                        namespace: StatusNamespace::Ssh,
698                                        update,
699                                    },
700                                );
701                            }
702                        }
703                    }
704                    Some(MetadataUpdate::DefiniteError(error)) => {
705                        health_output.give(
706                            &health_cap,
707                            HealthStatusMessage {
708                                id: None,
709                                namespace: StatusNamespace::Kafka,
710                                update: HealthStatusUpdate::stalled(
711                                    error.to_string(),
712                                    None,
713                                ),
714                            },
715                        );
716                        let error = Err(error.into());
717                        let time = data_cap.time().clone();
718                        for (output, error) in
719                            outputs.iter().map(|o| o.output_index).repeat_clone(error)
720                        {
721                            let update = ((output, error), time, Diff::ONE);
722                            let size = update.fuel_size();
723                            data_output
724                                .give_fueled(&data_cap, update, size)
725                                .await;
726                        }
727
728                        return;
729                    }
730                    None => {}
731                }
732
733                // Poll the consumer once. We split the consumer's partitions out into separate
734                // queues and poll those individually, but it's still necessary to drive logic that
735                // consumes from rdkafka's internal event queue, such as statistics callbacks.
736                //
737                // Additionally, assigning topics and splitting them off into separate queues is
738                // not atomic, so we expect to see at least some messages to show up when polling
739                // the consumer directly.
740                while let Some(result) = reader.consumer.poll(Duration::from_secs(0)) {
741                    match result {
742                        Err(e) => {
743                            let error = format!(
744                                "kafka error when polling consumer for source: {} topic: {} : {}",
745                                reader.source_name, reader.topic_name, e
746                            );
747                            let status = HealthStatusUpdate::stalled(error, None);
748                            health_output.give(
749                                &health_cap,
750                                HealthStatusMessage {
751                                    id: None,
752                                    namespace: StatusNamespace::Kafka,
753                                    update: status.clone(),
754                                },
755                            );
756                            for (output, status) in outputs.iter().repeat_clone(status) {
757                                health_output.give(
758                                    &health_cap,
759                                    HealthStatusMessage {
760                                        id: Some(output.id),
761                                        namespace: StatusNamespace::Kafka,
762                                        update: status,
763                                    },
764                                );
765                            }
766                        }
767                        Ok(message) => {
768                            let output_messages = outputs
769                                .iter()
770                                .map(|output| {
771                                    let (message, ts) = construct_source_message(
772                                        &message,
773                                        &output.metadata_columns,
774                                    );
775                                    (output.output_index, message, ts)
776                                })
777                                // This vec allocation is required to allow obtaining a `&mut`
778                                // on `reader` for the `reader.handle_message` call in the
779                                // loop below since  `message` is borrowed from `reader`.
780                                .collect::<Vec<_>>();
781                            for (output_index, message, ts) in output_messages {
782                                if let Some((msg, time, diff)) =
783                                    reader.handle_message(message, ts, &output_index)
784                                {
785                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
786                                    let part_cap = &reader.partition_capabilities[pid].data;
787                                    let msg = msg.map_err(|e| {
788                                        DataflowError::SourceError(Box::new(SourceError {
789                                            error: SourceErrorDetails::Other(e.to_string().into()),
790                                        }))
791                                    });
792                                    let update = ((output_index, msg), time, diff);
793                                    let size = update.fuel_size();
794                                    data_output
795                                        .give_fueled(part_cap, update, size)
796                                        .await;
797                                }
798                            }
799                        }
800                    }
801                }
802
803                reader.update_stats();
804
805                // Take the consumers temporarily to get around borrow checker errors
806                let mut consumers = std::mem::take(&mut reader.partition_consumers);
807                for consumer in consumers.iter_mut() {
808                    let pid = consumer.pid();
809                    // We want to make sure the rest of the actions in the outer loops get
810                    // a chance to run. If rdkafka keeps pumping data at us we might find
811                    // ourselves in a situation where we keep dumping data into the
812                    // dataflow without signaling progress. For this reason we consume at most
813                    // 10k messages from each partition and go around the loop.
814                    let mut partition_exhausted = false;
815                    for _ in 0..10_000 {
816                        let Some(message) = consumer.get_next_message().transpose() else {
817                            partition_exhausted = true;
818                            break;
819                        };
820
821                        for output in outputs.iter() {
822                            let message = match &message {
823                                Ok((msg, pid)) => {
824                                    let (msg, ts) =
825                                        construct_source_message(msg, &output.metadata_columns);
826                                    assert_eq!(*pid, ts.0);
827                                    Ok(reader.handle_message(msg, ts, &output.output_index))
828                                }
829                                Err(err) => Err(err),
830                            };
831                            match message {
832                                Ok(Some((msg, time, diff))) => {
833                                    let pid = time.interval().singleton().unwrap().unwrap_exact();
834                                    let part_cap = &reader.partition_capabilities[pid].data;
835                                    let msg = msg.map_err(|e| {
836                                        DataflowError::SourceError(Box::new(SourceError {
837                                            error: SourceErrorDetails::Other(e.to_string().into()),
838                                        }))
839                                    });
840                                    let update =
841                                        ((output.output_index, msg), time, diff);
842                                    let size = update.fuel_size();
843                                    data_output
844                                        .give_fueled(part_cap, update, size)
845                                        .await;
846                                }
847                                // The message was from an offset we've already seen.
848                                Ok(None) => continue,
849                                Err(err) => {
850                                    let last_offset = reader
851                                        .last_offsets
852                                        .get(&output.output_index)
853                                        .expect("output known to be installed")
854                                        .get(&pid)
855                                        .expect("partition known to be installed");
856
857                                    let status = HealthStatusUpdate::stalled(
858                                        format!(
859                                            "error consuming from source: {} topic: {topic}:\
860                                             partition: {pid} last processed offset:\
861                                             {last_offset} : {err}",
862                                            config.name
863                                        ),
864                                        None,
865                                    );
866                                    health_output.give(
867                                        &health_cap,
868                                        HealthStatusMessage {
869                                            id: None,
870                                            namespace: StatusNamespace::Kafka,
871                                            update: status.clone(),
872                                        },
873                                    );
874                                    health_output.give(
875                                        &health_cap,
876                                        HealthStatusMessage {
877                                            id: Some(output.id),
878                                            namespace: StatusNamespace::Kafka,
879                                            update: status,
880                                        },
881                                    );
882                                }
883                            }
884                        }
885                    }
886                    if !partition_exhausted {
887                        notificator.notify_one();
888                    }
889                }
890                // We can now put them back
891                assert!(reader.partition_consumers.is_empty());
892                reader.partition_consumers = consumers;
893
894                let positions = reader.consumer.position().unwrap();
895                let topic_positions = positions.elements_for_topic(&reader.topic_name);
896                let mut snapshot_staged = 0;
897
898                for position in topic_positions {
899                    // The offset begins in the `Offset::Invalid` state in which case we simply
900                    // skip this partition.
901                    if let Offset::Offset(offset) = position.offset() {
902                        let pid = position.partition();
903                        let upper_offset = MzOffset::from(u64::try_from(offset).unwrap());
904                        let upper =
905                            Partitioned::new_singleton(RangeBound::exact(pid), upper_offset);
906
907                        let part_cap = reader.partition_capabilities.get_mut(&pid).unwrap();
908                        match part_cap.data.try_downgrade(&upper) {
909                            Ok(()) => {
910                                if !snapshot_export_stats.is_empty() {
911                                    // The `.position()` of the consumer represents what offset we have
912                                    // read up to.
913                                    snapshot_staged += offset.try_into().unwrap_or(0u64);
914                                    // This will always be `Some` at this point.
915                                    if let Some(snapshot_total) = snapshot_total {
916                                        // We will eventually read past the snapshot total, so we need
917                                        // to bound it here.
918                                        snapshot_staged =
919                                            std::cmp::min(snapshot_staged, snapshot_total);
920                                    }
921                                }
922                            }
923                            Err(_) => {
924                                // If we can't downgrade, it means we have already seen this offset.
925                                // This is expected and we can safely ignore it.
926                                info!(
927                                    source_id = config.id.to_string(),
928                                    worker_id = config.worker_id,
929                                    num_workers = config.worker_count,
930                                    "kafka source frontier downgrade skipped due to already \
931                                     seen offset: {:?}",
932                                    upper
933                                );
934                            }
935                        };
936
937                    }
938                }
939
940                if let (Some(snapshot_total), true) =
941                    (snapshot_total, !snapshot_export_stats.is_empty())
942                {
943                    for export_stat in snapshot_export_stats.iter() {
944                        export_stat.set_snapshot_records_known(snapshot_total);
945                        export_stat.set_snapshot_records_staged(snapshot_staged);
946                    }
947                    if snapshot_total == snapshot_staged {
948                        snapshot_export_stats.clear();
949                    }
950                }
951            }
952        })
953    });
954
955    (
956        stream.as_collection(),
957        health_stream,
958        button.press_on_drop(),
959    )
960}
961
962impl KafkaResumeUpperProcessor {
963    async fn process_frontier(
964        &self,
965        frontier: Antichain<KafkaTimestamp>,
966    ) -> Result<(), anyhow::Error> {
967        use rdkafka::consumer::CommitMode;
968
969        // Generate a list of partitions that this worker is responsible for
970        let mut offsets = vec![];
971        let mut offset_committed = 0;
972        for ts in frontier.iter() {
973            if let Some(pid) = ts.interval().singleton() {
974                let pid = pid.unwrap_exact();
975                if responsible_for_pid(&self.config, *pid) {
976                    offsets.push((pid.clone(), *ts.timestamp()));
977
978                    // Note that we do not subtract 1 from the frontier. Imagine
979                    // that frontier is 2 for this pid. That means we have
980                    // full processed offset 0 and offset 1, which means we have
981                    // processed _2_ offsets.
982                    offset_committed += ts.timestamp().offset;
983                }
984            }
985        }
986
987        for export_stat in self.statistics.iter() {
988            export_stat.set_offset_committed(offset_committed);
989        }
990
991        if !offsets.is_empty() {
992            let mut tpl = TopicPartitionList::new();
993            for (pid, offset) in offsets {
994                let offset_to_commit =
995                    Offset::Offset(offset.offset.try_into().expect("offset to be vald i64"));
996                tpl.add_partition_offset(&self.topic_name, pid, offset_to_commit)
997                    .expect("offset known to be valid");
998            }
999            let consumer = Arc::clone(&self.consumer);
1000            mz_ore::task::spawn_blocking(
1001                || format!("source({}) kafka offset commit", self.config.id),
1002                move || consumer.commit(&tpl, CommitMode::Sync),
1003            )
1004            .await?;
1005        }
1006        Ok(())
1007    }
1008}
1009
1010impl KafkaSourceReader {
1011    /// Ensures that a partition queue for `pid` exists.
1012    fn ensure_partition(&mut self, pid: PartitionId) {
1013        if self.last_offsets.is_empty() {
1014            tracing::info!(
1015                source_id = %self.id,
1016                worker_id = %self.worker_id,
1017                "kafka source does not have any outputs, not creating partition queue");
1018
1019            return;
1020        }
1021        for last_offsets in self.last_offsets.values() {
1022            // early exit if we've already inserted this partition
1023            if last_offsets.contains_key(&pid) {
1024                return;
1025            }
1026        }
1027
1028        let start_offset = self.start_offsets.get(&pid).copied().unwrap_or(0);
1029        self.create_partition_queue(pid, Offset::Offset(start_offset));
1030
1031        for last_offsets in self.last_offsets.values_mut() {
1032            let prev = last_offsets.insert(pid, start_offset - 1);
1033            assert_none!(prev);
1034        }
1035    }
1036
1037    /// Creates a new partition queue for `partition_id`.
1038    fn create_partition_queue(&mut self, partition_id: PartitionId, initial_offset: Offset) {
1039        info!(
1040            source_id = self.id.to_string(),
1041            worker_id = self.worker_id,
1042            num_workers = self.worker_count,
1043            "activating Kafka queue for topic {}, partition {}",
1044            self.topic_name,
1045            partition_id,
1046        );
1047
1048        // Collect old partition assignments
1049        let tpl = self.consumer.assignment().unwrap();
1050        // Create list from assignments
1051        let mut partition_list = TopicPartitionList::new();
1052        for partition in tpl.elements_for_topic(&self.topic_name) {
1053            partition_list
1054                .add_partition_offset(partition.topic(), partition.partition(), partition.offset())
1055                .expect("offset known to be valid");
1056        }
1057        // Add new partition
1058        partition_list
1059            .add_partition_offset(&self.topic_name, partition_id, initial_offset)
1060            .expect("offset known to be valid");
1061        self.consumer
1062            .assign(&partition_list)
1063            .expect("assignment known to be valid");
1064
1065        // Since librdkafka v1.6.0, we need to recreate all partition queues
1066        // after every call to `self.consumer.assign`.
1067        let context = Arc::clone(self.consumer.context());
1068        for pc in &mut self.partition_consumers {
1069            pc.partition_queue = self
1070                .consumer
1071                .split_partition_queue(&self.topic_name, pc.pid)
1072                .expect("partition known to be valid");
1073            pc.partition_queue.set_nonempty_callback({
1074                let context = Arc::clone(&context);
1075                move || context.inner().activate()
1076            });
1077        }
1078
1079        let mut partition_queue = self
1080            .consumer
1081            .split_partition_queue(&self.topic_name, partition_id)
1082            .expect("partition known to be valid");
1083        partition_queue.set_nonempty_callback(move || context.inner().activate());
1084        self.partition_consumers
1085            .push(PartitionConsumer::new(partition_id, partition_queue));
1086        assert_eq!(
1087            self.consumer
1088                .assignment()
1089                .unwrap()
1090                .elements_for_topic(&self.topic_name)
1091                .len(),
1092            self.partition_consumers.len()
1093        );
1094    }
1095
1096    /// Read any statistics JSON blobs generated via the rdkafka statistics callback.
1097    fn update_stats(&mut self) {
1098        while let Ok(stats) = self.stats_rx.try_recv() {
1099            match serde_json::from_str::<Statistics>(&stats.to_string()) {
1100                Ok(statistics) => {
1101                    let topic = statistics.topics.get(&self.topic_name);
1102                    match topic {
1103                        Some(topic) => {
1104                            for (id, partition) in &topic.partitions {
1105                                self.partition_metrics
1106                                    .set_offset_max(*id, partition.hi_offset);
1107                            }
1108                        }
1109                        None => error!("No stats found for topic: {}", &self.topic_name),
1110                    }
1111                }
1112                Err(e) => {
1113                    error!("failed decoding librdkafka statistics JSON: {}", e);
1114                }
1115            }
1116        }
1117    }
1118
1119    /// Checks if the given message is viable for emission. This checks if the message offset is
1120    /// past the expected offset and returns None if it is not.
1121    fn handle_message(
1122        &mut self,
1123        message: Result<SourceMessage, KafkaHeaderParseError>,
1124        (partition, offset): (PartitionId, MzOffset),
1125        output_index: &usize,
1126    ) -> Option<(
1127        Result<SourceMessage, KafkaHeaderParseError>,
1128        KafkaTimestamp,
1129        Diff,
1130    )> {
1131        // Offsets are guaranteed to be 1) monotonically increasing *unless* there is
1132        // a network issue or a new partition added, at which point the consumer may
1133        // start processing the topic from the beginning, or we may see duplicate offsets
1134        // At all times, the guarantee : if we see offset x, we have seen all offsets [0,x-1]
1135        // that we are ever going to see holds.
1136        // Offsets are guaranteed to be contiguous when compaction is disabled. If compaction
1137        // is enabled, there may be gaps in the sequence.
1138        // If we see an "old" offset, we skip that message.
1139
1140        // Given the explicit consumer to partition assignment, we should never receive a message
1141        // for a partition for which we have no metadata
1142        assert!(
1143            self.last_offsets
1144                .get(output_index)
1145                .unwrap()
1146                .contains_key(&partition)
1147        );
1148
1149        let last_offset_ref = self
1150            .last_offsets
1151            .get_mut(output_index)
1152            .expect("output known to be installed")
1153            .get_mut(&partition)
1154            .expect("partition known to be installed");
1155
1156        let last_offset = *last_offset_ref;
1157        let offset_as_i64: i64 = offset.offset.try_into().expect("offset to be < i64::MAX");
1158        if offset_as_i64 <= last_offset {
1159            info!(
1160                source_id = self.id.to_string(),
1161                worker_id = self.worker_id,
1162                num_workers = self.worker_count,
1163                "kafka message before expected offset: \
1164                 source {} (reading topic {}, partition {}, output {}) \
1165                 received offset {} expected offset {:?}",
1166                self.source_name,
1167                self.topic_name,
1168                partition,
1169                output_index,
1170                offset.offset,
1171                last_offset + 1,
1172            );
1173            // We explicitly should not consume the message as we have already processed it.
1174            None
1175        } else {
1176            *last_offset_ref = offset_as_i64;
1177
1178            let ts = Partitioned::new_singleton(RangeBound::exact(partition), offset);
1179            Some((message, ts, Diff::ONE))
1180        }
1181    }
1182}
1183
1184fn construct_source_message(
1185    msg: &BorrowedMessage<'_>,
1186    metadata_columns: &[KafkaMetadataKind],
1187) -> (
1188    Result<SourceMessage, KafkaHeaderParseError>,
1189    (PartitionId, MzOffset),
1190) {
1191    let pid = msg.partition();
1192    let Ok(offset) = u64::try_from(msg.offset()) else {
1193        panic!(
1194            "got negative offset ({}) from otherwise non-error'd kafka message",
1195            msg.offset()
1196        );
1197    };
1198
1199    let mut metadata = Row::default();
1200    let mut packer = metadata.packer();
1201    for kind in metadata_columns {
1202        match kind {
1203            KafkaMetadataKind::Partition => packer.push(Datum::from(pid)),
1204            KafkaMetadataKind::Offset => packer.push(Datum::UInt64(offset)),
1205            KafkaMetadataKind::Timestamp => {
1206                let ts = msg
1207                    .timestamp()
1208                    .to_millis()
1209                    .expect("kafka sources always have upstream_time");
1210
1211                let d: Datum = DateTime::from_timestamp_millis(ts)
1212                    .and_then(|dt| {
1213                        let ct: Option<CheckedTimestamp<NaiveDateTime>> =
1214                            dt.naive_utc().try_into().ok();
1215                        ct
1216                    })
1217                    .into();
1218                packer.push(d)
1219            }
1220            KafkaMetadataKind::Header { key, use_bytes } => {
1221                match msg.headers() {
1222                    Some(headers) => {
1223                        let d = headers
1224                            .iter()
1225                            .filter(|header| header.key == key)
1226                            .last()
1227                            .map(|header| match header.value {
1228                                Some(v) => {
1229                                    if *use_bytes {
1230                                        Ok(Datum::Bytes(v))
1231                                    } else {
1232                                        match str::from_utf8(v) {
1233                                            Ok(str) => Ok(Datum::String(str)),
1234                                            Err(_) => Err(KafkaHeaderParseError::Utf8Error {
1235                                                key: key.clone(),
1236                                                raw: v.to_vec(),
1237                                            }),
1238                                        }
1239                                    }
1240                                }
1241                                None => Ok(Datum::Null),
1242                            })
1243                            .unwrap_or_else(|| {
1244                                Err(KafkaHeaderParseError::KeyNotFound { key: key.clone() })
1245                            });
1246                        match d {
1247                            Ok(d) => packer.push(d),
1248                            //abort with a definite error when the header is not found or cannot be parsed correctly
1249                            Err(err) => return (Err(err), (pid, offset.into())),
1250                        }
1251                    }
1252                    None => packer.push(Datum::Null),
1253                }
1254            }
1255            KafkaMetadataKind::Headers => {
1256                packer.push_list_with(|r| {
1257                    if let Some(headers) = msg.headers() {
1258                        for header in headers.iter() {
1259                            match header.value {
1260                                Some(v) => r.push_list_with(|record_row| {
1261                                    record_row.push(Datum::String(header.key));
1262                                    record_row.push(Datum::Bytes(v));
1263                                }),
1264                                None => r.push_list_with(|record_row| {
1265                                    record_row.push(Datum::String(header.key));
1266                                    record_row.push(Datum::Null);
1267                                }),
1268                            }
1269                        }
1270                    }
1271                });
1272            }
1273        }
1274    }
1275
1276    let key = match msg.key() {
1277        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1278        None => Row::pack([Datum::Null]),
1279    };
1280    let value = match msg.payload() {
1281        Some(bytes) => Row::pack([Datum::Bytes(bytes)]),
1282        None => Row::pack([Datum::Null]),
1283    };
1284    (
1285        Ok(SourceMessage {
1286            key,
1287            value,
1288            metadata,
1289        }),
1290        (pid, offset.into()),
1291    )
1292}
1293
1294/// Wrapper around a partition containing the underlying consumer
1295struct PartitionConsumer {
1296    /// the partition id with which this consumer is associated
1297    pid: PartitionId,
1298    /// The underlying Kafka partition queue
1299    partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1300}
1301
1302impl PartitionConsumer {
1303    /// Creates a new partition consumer from underlying Kafka consumer
1304    fn new(
1305        pid: PartitionId,
1306        partition_queue: PartitionQueue<TunnelingClientContext<GlueConsumerContext>>,
1307    ) -> Self {
1308        PartitionConsumer {
1309            pid,
1310            partition_queue,
1311        }
1312    }
1313
1314    /// Returns the next message to process for this partition (if any).
1315    ///
1316    /// The outer `Result` represents irrecoverable failures, the inner one can and will
1317    /// be transformed into empty values.
1318    ///
1319    /// The inner `Option` represents if there is a message to process.
1320    fn get_next_message(&self) -> Result<Option<(BorrowedMessage<'_>, PartitionId)>, KafkaError> {
1321        match self.partition_queue.poll(Duration::from_millis(0)) {
1322            Some(Ok(msg)) => Ok(Some((msg, self.pid))),
1323            Some(Err(err)) => Err(err),
1324            _ => Ok(None),
1325        }
1326    }
1327
1328    /// Return the partition id for this PartitionConsumer
1329    fn pid(&self) -> PartitionId {
1330        self.pid
1331    }
1332}
1333
1334/// An implementation of [`ConsumerContext`] that forwards statistics to the
1335/// worker
1336struct GlueConsumerContext {
1337    notificator: Arc<Notify>,
1338    stats_tx: crossbeam_channel::Sender<Jsonb>,
1339    inner: MzClientContext,
1340}
1341
1342impl ClientContext for GlueConsumerContext {
1343    fn stats_raw(&self, statistics: &[u8]) {
1344        match Jsonb::from_slice(statistics) {
1345            Ok(statistics) => {
1346                self.stats_tx
1347                    .send(statistics)
1348                    .expect("timely operator hung up while Kafka source active");
1349                self.activate();
1350            }
1351            Err(e) => error!("failed decoding librdkafka statistics JSON: {}", e),
1352        };
1353    }
1354
1355    // The shape of the rdkafka *Context traits require us to forward to the `MzClientContext`
1356    // implementation.
1357    fn log(&self, level: rdkafka::config::RDKafkaLogLevel, fac: &str, log_message: &str) {
1358        self.inner.log(level, fac, log_message)
1359    }
1360    fn error(&self, error: rdkafka::error::KafkaError, reason: &str) {
1361        self.inner.error(error, reason)
1362    }
1363}
1364
1365impl GlueConsumerContext {
1366    fn activate(&self) {
1367        self.notificator.notify_one();
1368    }
1369}
1370
1371impl ConsumerContext for GlueConsumerContext {}
1372
1373#[cfg(test)]
1374mod tests {
1375    use std::sync::Arc;
1376    use std::time::Duration;
1377
1378    use mz_kafka_util::client::create_new_client_config_simple;
1379    use rdkafka::consumer::{BaseConsumer, Consumer};
1380    use rdkafka::{Message, Offset, TopicPartitionList};
1381    use uuid::Uuid;
1382
1383    // Splitting off a partition queue with an `Offset` that is not `Offset::Beginning` seems to
1384    // lead to a race condition where sometimes we receive messages from polling the main consumer
1385    // instead of on the partition queue. This can be surfaced by running the test in a loop (in
1386    // the dataflow directory) using:
1387    //
1388    // cargo stress --lib --release source::kafka::tests::reproduce_kafka_queue_issue
1389    //
1390    // cargo-stress can be installed via `cargo install cargo-stress`
1391    //
1392    // You need to set up a topic "queue-test" with 1000 "hello" messages in it. Obviously, running
1393    // this test requires a running Kafka instance at localhost:9092.
1394    #[mz_ore::test]
1395    #[ignore]
1396    fn demonstrate_kafka_queue_race_condition() -> Result<(), anyhow::Error> {
1397        let topic_name = "queue-test";
1398        let pid = 0;
1399
1400        let mut kafka_config = create_new_client_config_simple();
1401        kafka_config.set("bootstrap.servers", "localhost:9092".to_string());
1402        kafka_config.set("enable.auto.commit", "false");
1403        kafka_config.set("group.id", Uuid::new_v4().to_string());
1404        kafka_config.set("fetch.message.max.bytes", "100");
1405        let consumer: BaseConsumer<_> = kafka_config.create()?;
1406
1407        let consumer = Arc::new(consumer);
1408
1409        let mut partition_list = TopicPartitionList::new();
1410        // Using Offset:Beginning here will work fine, only Offset:Offset(0) leads to the race
1411        // condition.
1412        partition_list.add_partition_offset(topic_name, pid, Offset::Offset(0))?;
1413
1414        consumer.assign(&partition_list)?;
1415
1416        let partition_queue = consumer
1417            .split_partition_queue(topic_name, pid)
1418            .expect("missing partition queue");
1419
1420        let expected_messages = 1_000;
1421
1422        let mut common_queue_count = 0;
1423        let mut partition_queue_count = 0;
1424
1425        loop {
1426            if let Some(msg) = consumer.poll(Duration::from_millis(0)) {
1427                match msg {
1428                    Ok(msg) => {
1429                        let _payload =
1430                            std::str::from_utf8(msg.payload().expect("missing payload"))?;
1431                        if partition_queue_count > 0 {
1432                            anyhow::bail!(
1433                                "Got message from common queue after we internally switched to partition queue."
1434                            );
1435                        }
1436
1437                        common_queue_count += 1;
1438                    }
1439                    Err(err) => anyhow::bail!("{}", err),
1440                }
1441            }
1442
1443            match partition_queue.poll(Duration::from_millis(0)) {
1444                Some(Ok(msg)) => {
1445                    let _payload = std::str::from_utf8(msg.payload().expect("missing payload"))?;
1446                    partition_queue_count += 1;
1447                }
1448                Some(Err(err)) => anyhow::bail!("{}", err),
1449                _ => (),
1450            }
1451
1452            if (common_queue_count + partition_queue_count) == expected_messages {
1453                break;
1454            }
1455        }
1456
1457        assert!(
1458            common_queue_count == 0,
1459            "Got {} out of {} messages from common queue. Partition queue: {}",
1460            common_queue_count,
1461            expected_messages,
1462            partition_queue_count
1463        );
1464
1465        Ok(())
1466    }
1467}
1468
1469/// Fetches the list of partitions and their corresponding high watermark.
1470fn fetch_partition_info<C: ConsumerContext>(
1471    consumer: &BaseConsumer<C>,
1472    topic: &str,
1473    fetch_timeout: Duration,
1474) -> Result<BTreeMap<PartitionId, HighWatermark>, GetPartitionsError> {
1475    let pids = get_partitions(consumer.client(), topic, fetch_timeout)?;
1476
1477    let mut offset_requests = TopicPartitionList::with_capacity(pids.len());
1478    for pid in pids {
1479        offset_requests.add_partition_offset(topic, pid, Offset::End)?;
1480    }
1481
1482    let offset_responses = consumer.offsets_for_times(offset_requests, fetch_timeout)?;
1483
1484    let mut result = BTreeMap::new();
1485    for entry in offset_responses.elements() {
1486        let offset = match entry.offset() {
1487            Offset::Offset(offset) => offset,
1488            offset => Err(anyhow!("unexpected high watermark offset: {offset:?}"))?,
1489        };
1490
1491        let pid = entry.partition();
1492        let watermark = offset.try_into().expect("invalid negative offset");
1493        result.insert(pid, watermark);
1494    }
1495
1496    Ok(result)
1497}
1498
1499/// An update produced by the metadata fetcher.
1500#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
1501enum MetadataUpdate {
1502    /// The current IDs and high watermarks of all topic partitions.
1503    Partitions(BTreeMap<PartitionId, HighWatermark>),
1504    /// A transient error.
1505    ///
1506    /// Transient errors stall the source until their cause has been resolved.
1507    TransientError(HealthStatus),
1508    /// A definite error.
1509    ///
1510    /// Definite errors cannot be recovered from. They poison the source until the end of time.
1511    DefiniteError(SourceError),
1512}
1513
1514impl MetadataUpdate {
1515    /// Return the upstream frontier resulting from the metadata update, if any.
1516    fn upstream_frontier(&self) -> Option<Antichain<KafkaTimestamp>> {
1517        match self {
1518            Self::Partitions(partitions) => {
1519                let max_pid = partitions.keys().last().copied();
1520                let lower = max_pid
1521                    .map(RangeBound::after)
1522                    .unwrap_or(RangeBound::NegInfinity);
1523                let future_ts =
1524                    Partitioned::new_range(lower, RangeBound::PosInfinity, MzOffset::from(0));
1525
1526                let mut frontier = Antichain::from_elem(future_ts);
1527                for (pid, high_watermark) in partitions {
1528                    frontier.insert(Partitioned::new_singleton(
1529                        RangeBound::exact(*pid),
1530                        MzOffset::from(*high_watermark),
1531                    ));
1532                }
1533
1534                Some(frontier)
1535            }
1536            Self::DefiniteError(_) => Some(Antichain::new()),
1537            Self::TransientError(_) => None,
1538        }
1539    }
1540}
1541
1542#[derive(Debug, thiserror::Error)]
1543pub enum KafkaHeaderParseError {
1544    #[error("A header with key '{key}' was not found in the message headers")]
1545    KeyNotFound { key: String },
1546    #[error(
1547        "Found ill-formed byte sequence in header '{key}' that cannot be decoded as valid utf-8 (original bytes: {raw:x?})"
1548    )]
1549    Utf8Error { key: String, raw: Vec<u8> },
1550}
1551
1552/// Render the metadata fetcher of a Kafka source.
1553///
1554/// The metadata fetcher is a single-worker operator that is responsible for periodically fetching
1555/// the Kafka topic metadata (partition IDs and high watermarks) and making it available as a
1556/// Timely stream.
1557fn render_metadata_fetcher<'scope>(
1558    scope: Scope<'scope, KafkaTimestamp>,
1559    connection: KafkaSourceConnection,
1560    config: RawSourceCreationConfig,
1561) -> (
1562    StreamVec<'scope, KafkaTimestamp, (mz_repr::Timestamp, MetadataUpdate)>,
1563    StreamVec<'scope, KafkaTimestamp, Probe<KafkaTimestamp>>,
1564    PressOnDropButton,
1565) {
1566    let active_worker_id = usize::cast_from(config.id.hashed());
1567    let is_active_worker = active_worker_id % scope.peers() == scope.index();
1568
1569    let resume_upper = Antichain::from_iter(
1570        config
1571            .source_resume_uppers
1572            .values()
1573            .map(|uppers| uppers.iter().map(KafkaTimestamp::decode_row))
1574            .flatten(),
1575    );
1576
1577    let name = format!("KafkaMetadataFetcher({})", config.id);
1578    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
1579
1580    let (metadata_output, metadata_stream) =
1581        builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1582    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1583
1584    let button = builder.build(move |caps| async move {
1585        if !is_active_worker {
1586            return;
1587        }
1588
1589        let [metadata_cap, probe_cap] = caps.try_into().unwrap();
1590
1591        let client_id = connection.client_id(
1592            config.config.config_set(),
1593            &config.config.connection_context,
1594            config.id,
1595        );
1596        let KafkaSourceConnection {
1597            connection,
1598            topic,
1599            topic_metadata_refresh_interval,
1600            ..
1601        } = connection;
1602
1603        let consumer: Result<BaseConsumer<_>, _> = connection
1604            .create_with_context(
1605                &config.config,
1606                MzClientContext::default(),
1607                &btreemap! {
1608                    // Use the user-configured topic metadata refresh
1609                    // interval.
1610                    "topic.metadata.refresh.interval.ms" =>
1611                        topic_metadata_refresh_interval
1612                        .as_millis()
1613                        .to_string(),
1614                    // Allow Kafka monitoring tools to identify this
1615                    // consumer.
1616                    "client.id" => format!("{client_id}-metadata"),
1617                },
1618                InTask::Yes,
1619            )
1620            .await;
1621
1622        let consumer = match consumer {
1623            Ok(consumer) => consumer,
1624            Err(e) => {
1625                let msg = format!(
1626                    "failed creating kafka metadata consumer: {}",
1627                    e.display_with_causes()
1628                );
1629                let status_update = HealthStatusUpdate::halting(msg, None);
1630                let status = match e {
1631                    ContextCreationError::Ssh(_) => HealthStatus::ssh(status_update),
1632                    _ => HealthStatus::kafka(status_update),
1633                };
1634                let error = MetadataUpdate::TransientError(status);
1635                let timestamp = (config.now_fn)().into();
1636                metadata_output.give(&metadata_cap, (timestamp, error));
1637
1638                // IMPORTANT: wedge forever until the `SuspendAndRestart` is processed.
1639                // Returning would incorrectly present to the remap operator as progress to the
1640                // empty frontier which would be incorrectly recorded to the remap shard.
1641                std::future::pending::<()>().await;
1642                unreachable!("pending future never returns");
1643            }
1644        };
1645
1646        let (tx, mut rx) = mpsc::unbounded_channel();
1647        spawn_metadata_thread(config, consumer, topic, tx);
1648
1649        let mut prev_upstream_frontier = resume_upper;
1650
1651        while let Some((timestamp, mut update)) = rx.recv().await {
1652            if prev_upstream_frontier.is_empty() {
1653                return;
1654            }
1655
1656            if let Some(upstream_frontier) = update.upstream_frontier() {
1657                // Topics are identified by name but it's possible that a user recreates a topic
1658                // with the same name. Ideally we'd want to catch all of these cases and
1659                // immediately error out the source, since the data is effectively gone.
1660                // Unfortunately this is not possible without something like KIP-516.
1661                //
1662                // The best we can do is check whether the upstream frontier regressed. This tells
1663                // us that the topic was recreated and now contains fewer offsets and/or fewer
1664                // partitions. Note that we are not able to detect topic recreation if neither of
1665                // the two are true.
1666                if !PartialOrder::less_equal(&prev_upstream_frontier, &upstream_frontier) {
1667                    let error = SourceError {
1668                        error: SourceErrorDetails::Other("topic was recreated".into()),
1669                    };
1670                    update = MetadataUpdate::DefiniteError(error);
1671                }
1672            }
1673
1674            if let Some(upstream_frontier) = update.upstream_frontier() {
1675                prev_upstream_frontier = upstream_frontier.clone();
1676
1677                let probe = Probe {
1678                    probe_ts: timestamp,
1679                    upstream_frontier,
1680                };
1681                probe_output.give(&probe_cap, probe);
1682            }
1683
1684            metadata_output.give(&metadata_cap, (timestamp, update));
1685        }
1686    });
1687
1688    (metadata_stream, probe_stream, button.press_on_drop())
1689}
1690
1691fn spawn_metadata_thread<C: ConsumerContext>(
1692    config: RawSourceCreationConfig,
1693    consumer: BaseConsumer<TunnelingClientContext<C>>,
1694    topic: String,
1695    tx: mpsc::UnboundedSender<(mz_repr::Timestamp, MetadataUpdate)>,
1696) {
1697    // Linux thread names are limited to 15 characters. Use a truncated ID to fit the name.
1698    thread::Builder::new()
1699        .name(format!("kfk-mtdt-{}", config.id))
1700        .spawn(move || {
1701            trace!(
1702                source_id = config.id.to_string(),
1703                worker_id = config.worker_id,
1704                num_workers = config.worker_count,
1705                "kafka metadata thread: starting..."
1706            );
1707
1708            let timestamp_interval = config.timestamp_interval;
1709            let mut ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
1710
1711            loop {
1712                let probe_ts = ticker.tick_blocking();
1713                let result = fetch_partition_info(
1714                    &consumer,
1715                    &topic,
1716                    config
1717                        .config
1718                        .parameters
1719                        .kafka_timeout_config
1720                        .fetch_metadata_timeout,
1721                );
1722                trace!(
1723                    source_id = config.id.to_string(),
1724                    worker_id = config.worker_id,
1725                    num_workers = config.worker_count,
1726                    "kafka metadata thread: metadata fetch result: {:?}",
1727                    result
1728                );
1729                let update = match result {
1730                    Ok(partitions) => {
1731                        trace!(
1732                            source_id = config.id.to_string(),
1733                            worker_id = config.worker_id,
1734                            num_workers = config.worker_count,
1735                            "kafka metadata thread: fetched partition metadata info",
1736                        );
1737
1738                        MetadataUpdate::Partitions(partitions)
1739                    }
1740                    Err(GetPartitionsError::TopicDoesNotExist) => {
1741                        let error = SourceError {
1742                            error: SourceErrorDetails::Other("topic was deleted".into()),
1743                        };
1744                        MetadataUpdate::DefiniteError(error)
1745                    }
1746                    Err(e) => {
1747                        let kafka_status = Some(HealthStatusUpdate::stalled(
1748                            format!("{}", e.display_with_causes()),
1749                            None,
1750                        ));
1751
1752                        let ssh_status = consumer.client().context().tunnel_status();
1753                        let ssh_status = match ssh_status {
1754                            SshTunnelStatus::Running => Some(HealthStatusUpdate::running()),
1755                            SshTunnelStatus::Errored(e) => {
1756                                Some(HealthStatusUpdate::stalled(e, None))
1757                            }
1758                        };
1759
1760                        MetadataUpdate::TransientError(HealthStatus {
1761                            kafka: kafka_status,
1762                            ssh: ssh_status,
1763                        })
1764                    }
1765                };
1766
1767                if tx.send((probe_ts, update)).is_err() {
1768                    break;
1769                }
1770            }
1771
1772            info!(
1773                source_id = config.id.to_string(),
1774                worker_id = config.worker_id,
1775                num_workers = config.worker_count,
1776                "kafka metadata thread: receiver has gone away; shutting down."
1777            )
1778        })
1779        .unwrap();
1780}