Skip to main content

mz_storage/sink/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the sink dataflow of a [`KafkaSinkConnection`]. The dataflow consists
11//! of two operators in order to take advantage of all the available workers.
12//!
13//! ```text
14//!        ┏━━━━━━━━━━━━━━┓
15//!        ┃   persist    ┃
16//!        ┃    source    ┃
17//!        ┗━━━━━━┯━━━━━━━┛
18//!               │ stream of arrangement batches (trace reader dropped)
19//!               │
20//!        ┏━━━━━━v━━━━━━┓
21//!        ┃    row      ┃ walks each batch's cursor and emits one
22//!        ┃   encoder   ┃ encoded `KafkaMessage` per DiffPair
23//!        ┗━━━━━━┯━━━━━━┛
24//!               │ encoded data
25//!               │
26//!        ┏━━━━━━v━━━━━━┓
27//!        ┃    kafka    ┃ (single worker)
28//!        ┃    sink     ┃
29//!        ┗━━┯━━━━━━━━┯━┛
30//!   records │        │ uppers
31//!      ╭────v──╮ ╭───v──────╮
32//!      │ data  │ │ progress │  <- records and uppers are produced
33//!      │ topic │ │  topic   │     transactionally to both topics
34//!      ╰───────╯ ╰──────────╯
35//! ```
36//!
37//! # Encoding
38//!
39//! One part of the dataflow deals with encoding the rows that we read from persist. The encoder
40//! walks the input arrangement's batches via
41//! [`mz_interchange::envelopes::for_each_diff_pair`], producing one encoded `KafkaMessage` per
42//! `DiffPair` observed at each `(key, timestamp)`. An initialization step first ensures that the
43//! schemas are published to the Schema Registry.
44//!
45//! # Sinking
46//!
47//! The other part of the dataflow, and what this module mostly deals with, is interacting with the
48//! Kafka cluster in order to transactionally commit batches (sets of records associated with a
49//! frontier). All the processing happens in a single worker and so all previously encoded records
50//! go through an exchange in order to arrive at the chosen worker. We may be able to improve this
51//! in the future by committing disjoint partitions of the key space for independent workers but
52//! for now we do the simple thing.
53//!
54//! ## Retries
55//!
56//! All of the retry logic heavy lifting is offloaded to `librdkafka` since it already implements
57//! the required behavior[1]. In particular we only ever enqueue records to its send queue and
58//! eventually call `commit_transaction` which will ensure that all queued messages are
59//! successfully delivered before the transaction is reported as committed.
60//!
61//! The only error that is possible during sending is that the queue is full. We are purposefully
62//! NOT handling this error and simply configure `librdkafka` with a very large queue. The reason
63//! for this choice is that the only choice for hanlding such an error ourselves would be to queue
64//! it, and there isn't a good argument about two small queues being better than one big one. If we
65//! reach the queue limit we simply error out the entire sink dataflow and start over.
66//!
67//! # Error handling
68//!
69//! Both the encoding operator and the sinking operator can produce a transient error that is wired
70//! up with our health monitoring and will trigger a restart of the sink dataflow.
71//!
72//! [1]: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#message-reliability
73
74use std::cell::RefCell;
75use std::cmp::Ordering;
76use std::collections::BTreeMap;
77use std::future::Future;
78use std::rc::Rc;
79use std::sync::atomic::AtomicU64;
80use std::sync::{Arc, Weak};
81use std::time::Duration;
82
83use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
84use crate::metrics::sink::kafka::KafkaSinkMetrics;
85use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
86use crate::statistics::SinkStatistics;
87use crate::storage_state::StorageState;
88use anyhow::{Context, anyhow, bail};
89use differential_dataflow::{AsCollection, Hashable, VecCollection};
90use futures::StreamExt;
91use maplit::btreemap;
92use mz_expr::MirScalarExpr;
93use mz_interchange::avro::AvroEncoder;
94use mz_interchange::encode::Encode;
95use mz_interchange::envelopes::{dbz_format, for_each_diff_pair};
96use mz_interchange::json::JsonEncoder;
97use mz_interchange::text_binary::{BinaryEncoder, TextEncoder};
98use mz_kafka_util::admin::EnsureTopicConfig;
99use mz_kafka_util::client::{
100    DEFAULT_FETCH_METADATA_TIMEOUT, GetPartitionsError, MzClientContext, TimeoutConfig,
101    TunnelingClientContext,
102};
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::CollectionExt;
105use mz_ore::error::ErrorExt;
106use mz_ore::future::InTask;
107use mz_ore::soft_assert_or_log;
108use mz_ore::task::{self, AbortOnDropHandle};
109use mz_persist_client::Diagnostics;
110use mz_persist_client::write::WriteHandle;
111use mz_persist_types::codec_impls::UnitSchema;
112use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
113use mz_storage_client::sink::progress_key::ProgressKey;
114use mz_storage_types::StorageDiff;
115use mz_storage_types::configuration::StorageConfiguration;
116use mz_storage_types::controller::CollectionMetadata;
117use mz_storage_types::dyncfgs::{
118    KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS, KAFKA_SINK_BATCH_NUM_MESSAGES,
119    KAFKA_SINK_BATCH_SIZE, KAFKA_SINK_MESSAGE_MAX_BYTES, SINK_ENSURE_TOPIC_CONFIG,
120    SINK_PROGRESS_SEARCH,
121};
122use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
123use mz_storage_types::sinks::{
124    KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, StorageSinkDesc,
125};
126use mz_storage_types::sources::SourceData;
127use mz_timely_util::antichain::AntichainExt;
128use mz_timely_util::builder_async::{
129    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
130};
131use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
132use rdkafka::error::KafkaError;
133use rdkafka::message::{Header, OwnedHeaders, ToBytes};
134use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
135use rdkafka::types::RDKafkaErrorCode;
136use rdkafka::{Message, Offset, Statistics, TopicPartitionList};
137use serde::{Deserialize, Deserializer, Serialize, Serializer};
138use timely::PartialOrder;
139use timely::container::CapacityContainerBuilder;
140use timely::dataflow::StreamVec;
141use timely::dataflow::channels::pact::{Exchange, Pipeline};
142use timely::dataflow::operators::vec::{Map, ToStream};
143use timely::dataflow::operators::{CapabilitySet, Concatenate};
144use timely::progress::{Antichain, Timestamp as _};
145use tokio::sync::watch;
146use tokio::time::{self, MissedTickBehavior};
147use tracing::{debug, error, info, warn};
148
149impl<'scope> SinkRender<'scope> for KafkaSinkConnection {
150    fn get_key_indices(&self) -> Option<&[usize]> {
151        self.key_desc_and_indices
152            .as_ref()
153            .map(|(_desc, indices)| indices.as_slice())
154    }
155
156    fn get_relation_key_indices(&self) -> Option<&[usize]> {
157        self.relation_key_indices.as_deref()
158    }
159
160    fn render_sink(
161        &self,
162        storage_state: &mut StorageState,
163        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
164        sink_id: GlobalId,
165        batches: SinkBatchStream<'scope>,
166        key_is_synthetic: bool,
167        // TODO(benesch): errors should stream out through the sink,
168        // if we figure out a protocol for that.
169        _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
170    ) -> (
171        StreamVec<'scope, Timestamp, HealthStatusMessage>,
172        Vec<PressOnDropButton>,
173    ) {
174        let scope = batches.scope();
175
176        let write_handle = {
177            let persist = Arc::clone(&storage_state.persist_clients);
178            let shard_meta = sink.to_storage_metadata.clone();
179            async move {
180                let client = persist.open(shard_meta.persist_location).await?;
181                let handle = client
182                    .open_writer(
183                        shard_meta.data_shard,
184                        Arc::new(shard_meta.relation_desc),
185                        Arc::new(UnitSchema),
186                        Diagnostics::from_purpose("sink handle"),
187                    )
188                    .await?;
189                Ok(handle)
190            }
191        };
192
193        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
194        storage_state
195            .sink_write_frontiers
196            .insert(sink_id, Rc::clone(&write_frontier));
197
198        let (encoded, encode_status, encode_token) = encode_collection(
199            format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
200            batches,
201            sink.envelope,
202            self.clone(),
203            storage_state.storage_configuration.clone(),
204            sink_id,
205            sink.from,
206            key_is_synthetic,
207        );
208
209        let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id);
210        let statistics = storage_state
211            .aggregated_statistics
212            .get_sink(&sink_id)
213            .expect("statistics initialized")
214            .clone();
215
216        let (sink_status, sink_token) = sink_collection(
217            format!("kafka-{sink_id}-sink"),
218            encoded,
219            sink_id,
220            self.clone(),
221            storage_state.storage_configuration.clone(),
222            sink,
223            metrics,
224            statistics,
225            write_handle,
226            write_frontier,
227        );
228
229        let running_status = Some(HealthStatusMessage {
230            id: None,
231            update: HealthStatusUpdate::Running,
232            namespace: StatusNamespace::Kafka,
233        })
234        .to_stream(scope);
235
236        let status = scope.concatenate([running_status, encode_status, sink_status]);
237
238        (status, vec![encode_token, sink_token])
239    }
240}
241
242struct TransactionalProducer {
243    /// The task name used for any blocking calls spawned onto the tokio threadpool.
244    task_name: String,
245    /// The topic where all the updates go.
246    data_topic: String,
247    /// The topic where all the upper frontiers go.
248    progress_topic: String,
249    /// The key each progress record is associated with.
250    progress_key: ProgressKey,
251    /// The version of this sink, used to fence out previous versions from writing.
252    sink_version: u64,
253    /// The number of partitions in the target topic.
254    partition_count: Arc<AtomicU64>,
255    /// A task to periodically refresh the partition count.
256    _partition_count_task: AbortOnDropHandle<()>,
257    /// The underlying Kafka producer.
258    producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
259    /// A handle to the metrics associated with this sink.
260    statistics: SinkStatistics,
261    /// The number of messages staged for the currently open transactions. It is reset to zero
262    /// every time a transaction commits.
263    staged_messages: u64,
264    /// The total number bytes staged for the currently open transactions. It is reset to zero
265    /// every time a transaction commits.
266    staged_bytes: u64,
267    /// The timeout to use for network operations.
268    socket_timeout: Duration,
269    /// The timeout to use for committing transactions.
270    transaction_timeout: Duration,
271}
272
273impl TransactionalProducer {
274    /// Initializes a transcational producer for the sink identified by `sink_id`. After this call
275    /// returns it is guranteed that all previous `TransactionalProducer` instances for the same
276    /// sink have been fenced out (i.e `init_transations()` has been called successfully).
277    async fn new(
278        sink_id: GlobalId,
279        connection: &KafkaSinkConnection,
280        storage_configuration: &StorageConfiguration,
281        metrics: Arc<KafkaSinkMetrics>,
282        statistics: SinkStatistics,
283        sink_version: u64,
284    ) -> Result<(Self, Antichain<mz_repr::Timestamp>), ContextCreationError> {
285        let client_id = connection.client_id(
286            storage_configuration.config_set(),
287            &storage_configuration.connection_context,
288            sink_id,
289        );
290        let transactional_id =
291            connection.transactional_id(&storage_configuration.connection_context, sink_id);
292
293        let timeout_config = &storage_configuration.parameters.kafka_timeout_config;
294        let mut options = BTreeMap::new();
295        // Ensure that messages are sinked in order and without duplicates. Note that this only
296        // applies to a single instance of a producer - in the case of restarts, all bets are off
297        // and full exactly once support is required.
298        options.insert("enable.idempotence", "true".into());
299        // Use the compression type requested by the user.
300        options.insert(
301            "compression.type",
302            connection.compression_type.to_librdkafka_option().into(),
303        );
304        // Set the maximum buffer size limit. We don't want to impose anything lower than the max
305        // here as the operator has nothing better to do with the data than to buffer them.
306        options.insert("queue.buffering.max.kbytes", "2147483647".into());
307        // Disable the default buffer limit of 100k messages. We don't want to impose any limit
308        // here as the operator has nothing better to do with the data than to buffer them.
309        options.insert("queue.buffering.max.messages", "0".into());
310        // Make the Kafka producer wait at least 10 ms before sending out MessageSets
311        options.insert("queue.buffering.max.ms", format!("{}", 10));
312        // Time out transactions after 60 seconds
313        options.insert(
314            "transaction.timeout.ms",
315            format!("{}", timeout_config.transaction_timeout.as_millis()),
316        );
317        // Use the transactional ID requested by the user.
318        options.insert("transactional.id", transactional_id);
319        // Allow Kafka monitoring tools to identify this producer.
320        options.insert("client.id", client_id);
321        // We want to be notified regularly with statistics
322        options.insert("statistics.interval.ms", "1000".into());
323        // Maximum size of a single produced message, controlled by dyncfg so
324        // operators can raise the limit when messages exceed librdkafka's 1MB
325        // default.
326        options.insert(
327            "message.max.bytes",
328            format!(
329                "{}",
330                KAFKA_SINK_MESSAGE_MAX_BYTES.get(storage_configuration.config_set())
331            ),
332        );
333        // Maximum size (bytes) of a produced MessageSet.
334        options.insert(
335            "batch.size",
336            format!(
337                "{}",
338                KAFKA_SINK_BATCH_SIZE.get(storage_configuration.config_set())
339            ),
340        );
341        // Maximum number of messages batched in one MessageSet.
342        options.insert(
343            "batch.num.messages",
344            format!(
345                "{}",
346                KAFKA_SINK_BATCH_NUM_MESSAGES.get(storage_configuration.config_set())
347            ),
348        );
349
350        let ctx = MzClientContext::default();
351
352        let stats_receiver = ctx.subscribe_statistics();
353        let task_name = format!("kafka_sink_metrics_collector:{sink_id}");
354        task::spawn(
355            || &task_name,
356            collect_statistics(stats_receiver, Arc::clone(&metrics)),
357        );
358
359        let producer: ThreadedProducer<_> = connection
360            .connection
361            .create_with_context(storage_configuration, ctx, &options, InTask::Yes)
362            .await?;
363
364        // The partition count is fixed up after we ensure the topic exists.
365        let partition_count = Arc::new(AtomicU64::new(0));
366        let update_partition_count = {
367            let partition_count = Arc::clone(&partition_count);
368            let metrics = Arc::clone(&metrics);
369            Arc::new(move |pc| {
370                partition_count.store(pc, std::sync::atomic::Ordering::SeqCst);
371                metrics.partition_count.set(pc);
372            })
373        };
374
375        // Start a task that will keep the partition count up to date in the
376        // background.
377        let partition_count_task = task::spawn(
378            || format!("kafka_sink_producer_fetch_metadata_loop:{sink_id}"),
379            fetch_partition_count_loop(
380                producer.clone(),
381                sink_id,
382                connection.topic.clone(),
383                connection.topic_metadata_refresh_interval,
384                Arc::clone(&update_partition_count),
385            ),
386        );
387
388        let task_name = format!("kafka_sink_producer:{sink_id}");
389        let progress_key = ProgressKey::new(sink_id);
390
391        let producer = Self {
392            task_name,
393            data_topic: connection.topic.clone(),
394            partition_count,
395            _partition_count_task: partition_count_task.abort_on_drop(),
396            progress_topic: connection
397                .progress_topic(&storage_configuration.connection_context)
398                .into_owned(),
399            progress_key,
400            sink_version,
401            producer,
402            statistics,
403            staged_messages: 0,
404            staged_bytes: 0,
405            socket_timeout: timeout_config.socket_timeout,
406            transaction_timeout: timeout_config.transaction_timeout,
407        };
408
409        let timeout = timeout_config.socket_timeout;
410        producer
411            .spawn_blocking(move |p| p.init_transactions(timeout))
412            .await?;
413
414        // We have just called init_transactions, which means that we have fenced out all previous
415        // transactional producers, making it safe to determine the resume upper.
416        let progress = determine_sink_progress(
417            sink_id,
418            connection,
419            storage_configuration,
420            Arc::clone(&metrics),
421        )
422        .await?;
423
424        let resume_upper = match progress {
425            Some(progress) => {
426                if sink_version < progress.version {
427                    return Err(ContextCreationError::Other(anyhow!(
428                        "Fenced off by newer version of the sink. ours={} theirs={}",
429                        sink_version,
430                        progress.version
431                    )));
432                }
433                progress.frontier
434            }
435            None => {
436                mz_storage_client::sink::ensure_kafka_topic(
437                    connection,
438                    storage_configuration,
439                    &connection.topic,
440                    &connection.topic_options,
441                    EnsureTopicConfig::Skip,
442                )
443                .await?;
444                Antichain::from_elem(Timestamp::minimum())
445            }
446        };
447
448        // At this point the topic must exist and so we can query for its
449        // partition count. Even though we have a background task to fetch the
450        // partition count, we do this synchronously to ensure we don't attempt
451        // to produce any messages with our initial partition count of 0.
452        let partition_count =
453            fetch_partition_count(&producer.producer, sink_id, &connection.topic).await?;
454        update_partition_count(partition_count);
455
456        Ok((producer, resume_upper))
457    }
458
459    /// Runs the blocking operation `f` on the producer in the tokio threadpool and checks for SSH
460    /// status in case of failure.
461    async fn spawn_blocking<F, R>(&self, f: F) -> Result<R, ContextCreationError>
462    where
463        F: FnOnce(
464                ThreadedProducer<TunnelingClientContext<MzClientContext>>,
465            ) -> Result<R, KafkaError>
466            + Send
467            + 'static,
468        R: Send + 'static,
469    {
470        let producer = self.producer.clone();
471        task::spawn_blocking(|| &self.task_name, move || f(producer))
472            .await
473            .check_ssh_status(self.producer.context())
474    }
475
476    async fn begin_transaction(&self) -> Result<(), ContextCreationError> {
477        self.spawn_blocking(|p| p.begin_transaction()).await
478    }
479
480    /// Synchronously puts the provided message to librdkafka's send queue. This method only
481    /// returns an error if the queue is full. Handling this error by buffering the message and
482    /// retrying is equivalent to adjusting the maximum number of queued items in rdkafka so it is
483    /// adviced that callers only handle this error in order to apply backpressure to the rest of
484    /// the system.
485    fn send(
486        &mut self,
487        message: &KafkaMessage,
488        time: Timestamp,
489        diff: Diff,
490    ) -> Result<(), KafkaError> {
491        assert_eq!(diff, Diff::ONE, "invalid sink update");
492
493        let mut headers = OwnedHeaders::new().insert(Header {
494            key: "materialize-timestamp",
495            value: Some(time.to_string().as_bytes()),
496        });
497        for header in &message.headers {
498            // Headers that start with `materialize-` are reserved for our
499            // internal use, so we silently drop any such user-specified
500            // headers. While this behavior is documented, it'd be a nicer UX to
501            // send a warning or error somewhere. Unfortunately sinks don't have
502            // anywhere user-visible to send errors. See database-issues#5148.
503            if header.key.starts_with("materialize-") {
504                continue;
505            }
506
507            headers = headers.insert(Header {
508                key: header.key.as_str(),
509                value: header.value.as_ref(),
510            });
511        }
512
513        let pc = self
514            .partition_count
515            .load(std::sync::atomic::Ordering::SeqCst);
516        let partition = Some(i32::try_from(message.hash % pc).unwrap());
517
518        let record = BaseRecord {
519            topic: &self.data_topic,
520            key: message.key.as_ref(),
521            payload: message.value.as_ref(),
522            headers: Some(headers),
523            partition,
524            timestamp: None,
525            delivery_opaque: (),
526        };
527        let key_size = message.key.as_ref().map(|k| k.len()).unwrap_or(0);
528        let value_size = message.value.as_ref().map(|k| k.len()).unwrap_or(0);
529        let headers_size = message
530            .headers
531            .iter()
532            .map(|h| h.key.len() + h.value.as_ref().map(|v| v.len()).unwrap_or(0))
533            .sum::<usize>();
534        let record_size = u64::cast_from(key_size + value_size + headers_size);
535        self.statistics.inc_messages_staged_by(1);
536        self.staged_messages += 1;
537        self.statistics.inc_bytes_staged_by(record_size);
538        self.staged_bytes += record_size;
539        self.producer.send(record).map_err(|(e, _)| e)
540    }
541
542    /// Commits all the staged updates of the currently open transaction plus a progress record
543    /// describing `upper` to the progress topic.
544    async fn commit_transaction(
545        &mut self,
546        upper: Antichain<Timestamp>,
547    ) -> Result<(), ContextCreationError> {
548        let progress = ProgressRecord {
549            frontier: upper,
550            version: self.sink_version,
551        };
552        let payload = serde_json::to_vec(&progress).expect("infallible");
553        let record = BaseRecord::to(&self.progress_topic)
554            .payload(&payload)
555            .key(&self.progress_key);
556        self.producer.send(record).map_err(|(e, _)| e)?;
557
558        fail::fail_point!("kafka_sink_commit_transaction");
559
560        let timeout = self.transaction_timeout;
561        match self
562            .spawn_blocking(move |p| p.commit_transaction(timeout))
563            .await
564        {
565            Ok(()) => {
566                self.statistics
567                    .inc_messages_committed_by(self.staged_messages);
568                self.statistics.inc_bytes_committed_by(self.staged_bytes);
569                self.staged_messages = 0;
570                self.staged_bytes = 0;
571                Ok(())
572            }
573            Err(ContextCreationError::KafkaError(KafkaError::Transaction(err))) => {
574                // Make one attempt at aborting the transaction before letting the error percolate
575                // up and the process exit. Aborting allows the consumers of the topic to skip over
576                // any messages we've written in the transaction, so it's polite to do... but if it
577                // fails, the transaction will be aborted either when fenced out by a future
578                // version of this producer or by the broker-side timeout.
579                if err.txn_requires_abort() {
580                    let timeout = self.socket_timeout;
581                    self.spawn_blocking(move |p| p.abort_transaction(timeout))
582                        .await?;
583                }
584                Err(ContextCreationError::KafkaError(KafkaError::Transaction(
585                    err,
586                )))
587            }
588            Err(err) => Err(err),
589        }
590    }
591}
592
593/// Listens for statistics updates from librdkafka and updates our Prometheus metrics.
594async fn collect_statistics(
595    mut receiver: watch::Receiver<Statistics>,
596    metrics: Arc<KafkaSinkMetrics>,
597) {
598    let mut outbuf_cnt: i64 = 0;
599    let mut outbuf_msg_cnt: i64 = 0;
600    let mut waitresp_cnt: i64 = 0;
601    let mut waitresp_msg_cnt: i64 = 0;
602    let mut txerrs: u64 = 0;
603    let mut txretries: u64 = 0;
604    let mut req_timeouts: u64 = 0;
605    let mut connects: i64 = 0;
606    let mut disconnects: i64 = 0;
607    while receiver.changed().await.is_ok() {
608        let stats = receiver.borrow();
609        for broker in stats.brokers.values() {
610            outbuf_cnt += broker.outbuf_cnt;
611            outbuf_msg_cnt += broker.outbuf_msg_cnt;
612            waitresp_cnt += broker.waitresp_cnt;
613            waitresp_msg_cnt += broker.waitresp_msg_cnt;
614            txerrs += broker.txerrs;
615            txretries += broker.txretries;
616            req_timeouts += broker.req_timeouts;
617            connects += broker.connects.unwrap_or(0);
618            disconnects += broker.disconnects.unwrap_or(0);
619        }
620        metrics.rdkafka_msg_cnt.set(stats.msg_cnt);
621        metrics.rdkafka_msg_size.set(stats.msg_size);
622        metrics.rdkafka_txmsgs.set(stats.txmsgs);
623        metrics.rdkafka_txmsg_bytes.set(stats.txmsg_bytes);
624        metrics.rdkafka_tx.set(stats.tx);
625        metrics.rdkafka_tx_bytes.set(stats.tx_bytes);
626        metrics.rdkafka_outbuf_cnt.set(outbuf_cnt);
627        metrics.rdkafka_outbuf_msg_cnt.set(outbuf_msg_cnt);
628        metrics.rdkafka_waitresp_cnt.set(waitresp_cnt);
629        metrics.rdkafka_waitresp_msg_cnt.set(waitresp_msg_cnt);
630        metrics.rdkafka_txerrs.set(txerrs);
631        metrics.rdkafka_txretries.set(txretries);
632        metrics.rdkafka_req_timeouts.set(req_timeouts);
633        metrics.rdkafka_connects.set(connects);
634        metrics.rdkafka_disconnects.set(disconnects);
635    }
636}
637
638/// A message to produce to Kafka.
639#[derive(Debug, Clone, Serialize, Deserialize)]
640struct KafkaMessage {
641    /// A hash of the key that can be used for partitioning.
642    hash: u64,
643    /// The message key.
644    key: Option<Vec<u8>>,
645    /// The message value.
646    value: Option<Vec<u8>>,
647    /// Message headers.
648    headers: Vec<KafkaHeader>,
649}
650
651/// A header to attach to a Kafka message.
652#[derive(Debug, Clone, Serialize, Deserialize)]
653struct KafkaHeader {
654    /// The header key.
655    key: String,
656    /// The header value.
657    value: Option<Vec<u8>>,
658}
659
660/// Sinks a collection of encoded rows to Kafka.
661///
662/// This operator exchanges all updates to a single worker by hashing on the given sink `id`.
663///
664/// Updates are sent in ascending timestamp order.
665fn sink_collection<'scope>(
666    name: String,
667    input: VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
668    sink_id: GlobalId,
669    connection: KafkaSinkConnection,
670    storage_configuration: StorageConfiguration,
671    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
672    metrics: KafkaSinkMetrics,
673    statistics: SinkStatistics,
674    write_handle: impl Future<
675        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
676    > + 'static,
677    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
678) -> (
679    StreamVec<'scope, Timestamp, HealthStatusMessage>,
680    PressOnDropButton,
681) {
682    let scope = input.scope();
683    let mut builder = AsyncOperatorBuilder::new(name.clone(), input.inner.scope());
684
685    // We want exactly one worker to send all the data to the sink topic.
686    let hashed_id = sink_id.hashed();
687    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
688    let buffer_min_capacity =
689        KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS.handle(storage_configuration.config_set());
690
691    let mut input = builder.new_disconnected_input(input.inner, Exchange::new(move |_| hashed_id));
692
693    let as_of = sink.as_of.clone();
694    let sink_version = sink.version;
695    let (button, errors) = builder.build_fallible(move |_caps| {
696        Box::pin(async move {
697            if !is_active_worker {
698                write_frontier.borrow_mut().clear();
699                return Ok(());
700            }
701
702            fail::fail_point!("kafka_sink_creation_error", |_| Err(
703                ContextCreationError::Other(anyhow::anyhow!("synthetic error"))
704            ));
705
706            let mut write_handle = write_handle.await?;
707
708            let metrics = Arc::new(metrics);
709
710            let (mut producer, resume_upper) = TransactionalProducer::new(
711                sink_id,
712                &connection,
713                &storage_configuration,
714                Arc::clone(&metrics),
715                statistics,
716                sink_version,
717            )
718            .await?;
719
720            // The input has overcompacted if
721            let overcompacted =
722                // ..we have made some progress in the past
723                *resume_upper != [Timestamp::minimum()] &&
724                // ..but the since frontier is now beyond that
725                !PartialOrder::less_equal(&as_of, &resume_upper);
726            if overcompacted {
727                let err = format!(
728                    "{name}: input compacted past resume upper: as_of {}, resume_upper: {}",
729                    as_of.pretty(),
730                    resume_upper.pretty()
731                );
732                // This would normally be an assertion but because it can happen after a
733                // Materialize backup/restore we log an error so that it appears on Sentry but
734                // leaves the rest of the objects in the cluster unaffected.
735                error!("{err}");
736                return Err(anyhow!("{err}").into());
737            }
738
739            info!(
740                "{name}: as_of: {}, resume upper: {}",
741                as_of.pretty(),
742                resume_upper.pretty()
743            );
744
745            // The section below relies on TotalOrder for correctness so we'll work with timestamps
746            // directly to make sure this doesn't compile if someone attempts to make this operator
747            // generic over partial orders in the future.
748            let Some(mut upper) = resume_upper.clone().into_option() else {
749                write_frontier.borrow_mut().clear();
750                return Ok(());
751            };
752
753            let mut deferred_updates = vec![];
754            let mut extra_updates = vec![];
755            // We must wait until we have data to commit before starting a transaction because
756            // Kafka doesn't have a heartbeating mechanism to keep a transaction open indefinitely.
757            // This flag tracks whether we have started the transaction.
758            let mut transaction_begun = false;
759            while let Some(event) = input.next().await {
760                match event {
761                    Event::Data(_cap, batch) => {
762                        for (message, time, diff) in batch {
763                            // We want to publish updates in time order and we know that we have
764                            // already committed all times not beyond `upper`. Therefore, if this
765                            // update happens *exactly* at upper then it is the minimum pending
766                            // time and so emitting it now will not violate the timestamp publish
767                            // order. This optimization is load bearing because it is the mechanism
768                            // by which we incrementally stream the initial snapshot out to Kafka
769                            // instead of buffering it all in memory first. This argument doesn't
770                            // hold for partially ordered time because many different timestamps
771                            // can be *exactly* at upper but we can't know ahead of time which one
772                            // will be advanced in the next progress message.
773                            match upper.cmp(&time) {
774                                Ordering::Less => deferred_updates.push((message, time, diff)),
775                                Ordering::Equal => {
776                                    if !transaction_begun {
777                                        producer.begin_transaction().await?;
778                                        transaction_begun = true;
779                                    }
780                                    producer.send(&message, time, diff)?;
781                                }
782                                Ordering::Greater => continue,
783                            }
784                        }
785                    }
786                    Event::Progress(progress) => {
787                        // Ignore progress updates before our resumption frontier
788                        if !PartialOrder::less_equal(&resume_upper, &progress) {
789                            continue;
790                        }
791                        // Also ignore progress updates until we are past the as_of frontier. This
792                        // is to avoid the following pathological scenario:
793                        // 1. Sink gets instantiated with an as_of = {10}, resume_upper = {0}.
794                        //    `progress` initially jumps at {10}, then the snapshot appears at time
795                        //    10.
796                        // 2. `progress` would normally advance to say {11} and we would commit the
797                        //    snapshot but clusterd crashes instead.
798                        // 3. A new cluster restarts the sink with an earlier as_of, say {5}. This
799                        //    is valid, the earlier as_of has strictly more information. The
800                        //    snapshot now appears at time 5.
801                        //
802                        // If we were to commit an empty transaction in step 1 and advanced the
803                        // resume_upper to {10} then in step 3 we would ignore the snapshot that
804                        // now appears at 5 completely. So it is important to only start committing
805                        // transactions after we're strictly beyond the as_of.
806                        // TODO(petrosagg): is this logic an indication of us holding something
807                        // wrong elsewhere? Investigate.
808                        // Note: !PartialOrder::less_than(as_of, progress) would not be equivalent
809                        // nor correct for partially ordered times.
810                        if !as_of.iter().all(|t| !progress.less_equal(t)) {
811                            continue;
812                        }
813                        if !transaction_begun {
814                            producer.begin_transaction().await?;
815                        }
816
817                        extra_updates.extend(
818                            deferred_updates
819                                .extract_if(.., |(_, time, _)| !progress.less_equal(time)),
820                        );
821                        // Shrink after draining items out, so the call actually
822                        // reduces capacity in the oversized-buffer scenario
823                        // (e.g. progress topic was deleted and resume upper is 0).
824                        deferred_updates.shrink_to(buffer_min_capacity.get());
825                        extra_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
826
827                        for (message, time, diff) in extra_updates.drain(..) {
828                            producer.send(&message, time, diff)?;
829                        }
830                        extra_updates.shrink_to(buffer_min_capacity.get());
831
832                        debug!("{name}: committing transaction for {}", progress.pretty());
833                        producer.commit_transaction(progress.clone()).await?;
834                        transaction_begun = false;
835                        let mut expect_upper = write_handle.shared_upper();
836                        loop {
837                            if PartialOrder::less_equal(&progress, &expect_upper) {
838                                // The frontier has already been advanced as far as necessary.
839                                break;
840                            }
841                            // TODO(sinks): include the high water mark in the output topic for
842                            // the messages we've published, if and when we allow reads to the sink
843                            // directly, to allow monitoring the progress of the sink in terms of
844                            // the output system.
845                            const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
846                            match write_handle
847                                .compare_and_append(EMPTY, expect_upper, progress.clone())
848                                .await
849                                .expect("valid usage")
850                            {
851                                Ok(()) => break,
852                                Err(mismatch) => {
853                                    expect_upper = mismatch.current;
854                                }
855                            }
856                        }
857                        write_frontier.borrow_mut().clone_from(&progress);
858                        match progress.into_option() {
859                            Some(new_upper) => upper = new_upper,
860                            None => break,
861                        }
862                    }
863                }
864            }
865            Ok(())
866        })
867    });
868
869    let statuses = errors.map(|error: Rc<ContextCreationError>| {
870        let hint = match *error {
871            ContextCreationError::KafkaError(KafkaError::Transaction(ref e)) => {
872                if e.is_retriable() && e.code() == RDKafkaErrorCode::OperationTimedOut {
873                    let hint = "If you're running a single Kafka broker, ensure that the configs \
874                        transaction.state.log.replication.factor, transaction.state.log.min.isr, \
875                        and offsets.topic.replication.factor are set to 1 on the broker";
876                    Some(hint.to_owned())
877                } else {
878                    None
879                }
880            }
881            _ => None,
882        };
883
884        HealthStatusMessage {
885            id: None,
886            update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint),
887            namespace: if matches!(*error, ContextCreationError::Ssh(_)) {
888                StatusNamespace::Ssh
889            } else {
890                StatusNamespace::Kafka
891            },
892        }
893    });
894
895    (statuses, button.press_on_drop())
896}
897
898/// Determines the latest progress record from the specified topic for the given
899/// progress key.
900///
901/// IMPORTANT: to achieve exactly once guarantees, the producer that will resume
902/// production at the returned timestamp *must* have called `init_transactions`
903/// prior to calling this method.
904async fn determine_sink_progress(
905    sink_id: GlobalId,
906    connection: &KafkaSinkConnection,
907    storage_configuration: &StorageConfiguration,
908    metrics: Arc<KafkaSinkMetrics>,
909) -> Result<Option<ProgressRecord>, ContextCreationError> {
910    // ****************************** WARNING ******************************
911    // Be VERY careful when editing the code in this function. It is very easy
912    // to accidentally introduce a correctness or liveness bug when refactoring
913    // this code.
914    // ****************************** WARNING ******************************
915
916    let TimeoutConfig {
917        fetch_metadata_timeout,
918        progress_record_fetch_timeout,
919        ..
920    } = storage_configuration.parameters.kafka_timeout_config;
921
922    let client_id = connection.client_id(
923        storage_configuration.config_set(),
924        &storage_configuration.connection_context,
925        sink_id,
926    );
927    let group_id = connection.progress_group_id(&storage_configuration.connection_context, sink_id);
928    let progress_topic = connection
929        .progress_topic(&storage_configuration.connection_context)
930        .into_owned();
931    let progress_topic_options = &connection.connection.progress_topic_options;
932    let progress_key = ProgressKey::new(sink_id);
933
934    let common_options = btreemap! {
935        // Consumer group ID, which may have been overridden by the user. librdkafka requires this,
936        // even though we'd prefer to disable the consumer group protocol entirely.
937        "group.id" => group_id,
938        // Allow Kafka monitoring tools to identify this consumer.
939        "client.id" => client_id,
940        "enable.auto.commit" => "false".into(),
941        "auto.offset.reset" => "earliest".into(),
942        // The fetch loop below needs EOF notifications to reliably detect that we have reached the
943        // high watermark.
944        "enable.partition.eof" => "true".into(),
945    };
946
947    // Construct two cliens in read committed and read uncommitted isolations respectively. See
948    // comment below for an explanation on why we need it.
949    let progress_client_read_committed: BaseConsumer<_> = {
950        let mut opts = common_options.clone();
951        opts.insert("isolation.level", "read_committed".into());
952        let ctx = MzClientContext::default();
953        connection
954            .connection
955            .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
956            .await?
957    };
958
959    let progress_client_read_uncommitted: BaseConsumer<_> = {
960        let mut opts = common_options;
961        opts.insert("isolation.level", "read_uncommitted".into());
962        let ctx = MzClientContext::default();
963        connection
964            .connection
965            .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
966            .await?
967    };
968
969    let ctx = Arc::clone(progress_client_read_committed.client().context());
970
971    // Ensure the progress topic exists.
972    let ensure_topic_config =
973        match &*SINK_ENSURE_TOPIC_CONFIG.get(storage_configuration.config_set()) {
974            "skip" => EnsureTopicConfig::Skip,
975            "check" => EnsureTopicConfig::Check,
976            "alter" => EnsureTopicConfig::Alter,
977            _ => {
978                tracing::warn!(
979                    topic = progress_topic,
980                    "unexpected value for ensure-topic-config; skipping checks"
981                );
982                EnsureTopicConfig::Skip
983            }
984        };
985    mz_storage_client::sink::ensure_kafka_topic(
986        connection,
987        storage_configuration,
988        &progress_topic,
989        progress_topic_options,
990        ensure_topic_config,
991    )
992    .await
993    .add_context("error registering kafka progress topic for sink")?;
994
995    // We are about to spawn a blocking task that cannot be aborted by simply calling .abort() on
996    // its handle but we must be able to cancel it prompty so as to not leave long running
997    // operations around when interest to this task is lost. To accomplish this we create a shared
998    // token of which a weak reference is given to the task and a strong reference is held by the
999    // parent task. The task periodically checks if its weak reference is still valid before
1000    // continuing its work.
1001    let parent_token = Arc::new(());
1002    let child_token = Arc::downgrade(&parent_token);
1003    let task_name = format!("get_latest_ts:{sink_id}");
1004    let sink_progress_search = SINK_PROGRESS_SEARCH.get(storage_configuration.config_set());
1005    let result = task::spawn_blocking(|| task_name, move || {
1006        let progress_topic = progress_topic.as_ref();
1007        // Ensure the progress topic has exactly one partition. Kafka only
1008        // guarantees ordering within a single partition, and we need a strict
1009        // order on the progress messages we read and write.
1010        let partitions = match mz_kafka_util::client::get_partitions(
1011            progress_client_read_committed.client(),
1012            progress_topic,
1013            fetch_metadata_timeout,
1014        ) {
1015            Ok(partitions) => partitions,
1016            Err(GetPartitionsError::TopicDoesNotExist) => {
1017                // The progress topic doesn't exist, which indicates there is
1018                // no committed timestamp.
1019                return Ok(None);
1020            }
1021            e => e.with_context(|| {
1022                format!(
1023                    "Unable to fetch metadata about progress topic {}",
1024                    progress_topic
1025                )
1026            })?,
1027        };
1028        if partitions.len() != 1 {
1029            bail!(
1030                    "Progress topic {} should contain a single partition, but instead contains {} partitions",
1031                    progress_topic, partitions.len(),
1032                );
1033        }
1034        let partition = partitions.into_element();
1035
1036        // We scan from the beginning and see if we can find a progress record. We have
1037        // to do it like this because Kafka Control Batches mess with offsets. We
1038        // therefore cannot simply take the last offset from the back and expect a
1039        // progress message there. With a transactional producer, the OffsetTail(1) will
1040        // not point to an progress message but a control message. With aborted
1041        // transactions, there might even be a lot of garbage at the end of the
1042        // topic or in between.
1043
1044        metrics.consumed_progress_records.set(0);
1045
1046        // First, determine the current high water mark for the progress topic.
1047        // This is the position our `progress_client` consumer *must* reach
1048        // before we can conclude that we've seen the latest progress record for
1049        // the specified `progress_key`. A safety argument:
1050        //
1051        //   * Our caller has initialized transactions before calling this
1052        //     method, which prevents the prior incarnation of this sink from
1053        //     committing any further progress records.
1054        //
1055        //   * We use `read_uncommitted` isolation to ensure that we fetch the
1056        //     true high water mark for the topic, even if there are pending
1057        //     transactions in the topic. If we used the `read_committed`
1058        //     isolation level, we'd instead get the "last stable offset" (LSO),
1059        //     which is the offset of the first message in an open transaction,
1060        //     which might not include the last progress message committed for
1061        //     this sink! (While the caller of this function has fenced out
1062        //     older producers for this sink, *other* sinks writing using the
1063        //     same progress topic might have long-running transactions that
1064        //     hold back the LSO.)
1065        //
1066        //   * If another sink spins up and fences out the producer for this
1067        //     incarnation of the sink, we may not see the latest progress
1068        //     record... but since the producer has been fenced out, it will be
1069        //     unable to act on our stale information.
1070        //
1071        let (lo, hi) = progress_client_read_uncommitted
1072            .fetch_watermarks(progress_topic, partition, fetch_metadata_timeout)
1073            .map_err(|e| {
1074                anyhow!(
1075                    "Failed to fetch metadata while reading from progress topic: {}",
1076                    e
1077                )
1078            })?;
1079
1080        // This topic might be long, but the desired offset will usually be right near the end.
1081        // Instead of always scanning through the entire topic, we scan through exponentially-growing
1082        // suffixes of it. (Because writes are ordered, the largest progress record in any suffix,
1083        // if present, is the global max.) If we find it in one of our suffixes, we've saved at least
1084        // an order of magnitude of work; if we don't, we've added at most a constant factor.
1085        let mut start_indices = vec![lo];
1086        if sink_progress_search {
1087            let mut lookback = hi.saturating_sub(lo) / 10;
1088            while lookback >= 20_000 {
1089                start_indices.push(hi - lookback);
1090                lookback /= 10;
1091            }
1092        }
1093        for lo in start_indices.into_iter().rev() {
1094            if let Some(found) = progress_search(
1095                &progress_client_read_committed,
1096                progress_record_fetch_timeout,
1097                progress_topic,
1098                partition,
1099                lo,
1100                hi,
1101                progress_key.clone(),
1102                Weak::clone(&child_token),
1103                Arc::clone(&metrics)
1104            )? {
1105                return Ok(Some(found));
1106            }
1107        }
1108        Ok(None)
1109    }).await.check_ssh_status(&ctx);
1110    // Express interest to the computation until after we've received its result
1111    drop(parent_token);
1112    result
1113}
1114
1115fn progress_search<C: ConsumerContext + 'static>(
1116    progress_client_read_committed: &BaseConsumer<C>,
1117    progress_record_fetch_timeout: Duration,
1118    progress_topic: &str,
1119    partition: i32,
1120    lo: i64,
1121    hi: i64,
1122    progress_key: ProgressKey,
1123    child_token: Weak<()>,
1124    metrics: Arc<KafkaSinkMetrics>,
1125) -> anyhow::Result<Option<ProgressRecord>> {
1126    // Seek to the beginning of the given range in the progress topic.
1127    let mut tps = TopicPartitionList::new();
1128    tps.add_partition(progress_topic, partition);
1129    tps.set_partition_offset(progress_topic, partition, Offset::Offset(lo))?;
1130    progress_client_read_committed
1131        .assign(&tps)
1132        .with_context(|| {
1133            format!(
1134                "Error seeking in progress topic {}:{}",
1135                progress_topic, partition
1136            )
1137        })?;
1138
1139    // Helper to get the progress consumer's current position.
1140    let get_position = || {
1141        if child_token.strong_count() == 0 {
1142            bail!("operation cancelled");
1143        }
1144        let position = progress_client_read_committed
1145            .position()?
1146            .find_partition(progress_topic, partition)
1147            .ok_or_else(|| {
1148                anyhow!(
1149                    "No position info found for progress topic {}",
1150                    progress_topic
1151                )
1152            })?
1153            .offset();
1154        let position = match position {
1155            Offset::Offset(position) => position,
1156            // An invalid offset indicates the consumer has not yet read a
1157            // message. Since we assigned the consumer to the beginning of
1158            // the topic, it's safe to return the low water mark here, which
1159            // indicates the position before the first possible message.
1160            //
1161            // Note that it's important to return the low water mark and not
1162            // the minimum possible offset (i.e., zero) in order to break
1163            // out of the loop if the topic is empty but the low water mark
1164            // is greater than zero.
1165            Offset::Invalid => lo,
1166            _ => bail!(
1167                "Consumer::position returned offset of wrong type: {:?}",
1168                position
1169            ),
1170        };
1171        // Record the outstanding number of progress records that remain to be processed
1172        let outstanding = u64::try_from(std::cmp::max(0, hi - position)).unwrap();
1173        metrics.outstanding_progress_records.set(outstanding);
1174        Ok(position)
1175    };
1176
1177    info!("fetching latest progress record for {progress_key}, lo/hi: {lo}/{hi}");
1178
1179    // Read messages until the consumer is positioned at or beyond the high
1180    // water mark.
1181    //
1182    // We use `read_committed` isolation to ensure we don't see progress
1183    // records for transactions that did not commit. This means we have to
1184    // wait for the LSO to progress to the high water mark `hi`, which means
1185    // waiting for any open transactions for other sinks using the same
1186    // progress topic to complete. We set a short transaction timeout (10s)
1187    // to ensure we never need to wait more than 10s.
1188    //
1189    // Note that the stall time on the progress topic is not a function of
1190    // transaction size. We've designed our transactions so that the
1191    // progress record is always written last, after all the data has been
1192    // written, and so the window of time in which the progress topic has an
1193    // open transaction is quite small. The only vulnerability is if another
1194    // sink using the same progress topic crashes in that small window
1195    // between writing the progress record and committing the transaction,
1196    // in which case we have to wait out the transaction timeout.
1197    //
1198    // Important invariant: we only exit this loop successfully (i.e., not
1199    // returning an error) if we have positive proof of a position at or
1200    // beyond the high water mark. To make this invariant easy to check, do
1201    // not use `break` in the body of the loop.
1202    let mut last_progress: Option<ProgressRecord> = None;
1203    loop {
1204        let current_position = get_position()?;
1205
1206        if current_position >= hi {
1207            // consumer is at or beyond the high water mark and has read enough messages
1208            break;
1209        }
1210
1211        let message = match progress_client_read_committed.poll(progress_record_fetch_timeout) {
1212            Some(Ok(message)) => message,
1213            Some(Err(KafkaError::PartitionEOF(_))) => {
1214                // No message, but the consumer's position may have advanced
1215                // past a transaction control message that positions us at
1216                // or beyond the high water mark. Go around the loop again
1217                // to check.
1218                continue;
1219            }
1220            Some(Err(e)) => bail!("failed to fetch progress message {e}"),
1221            None => {
1222                bail!(
1223                    "timed out while waiting to reach high water mark of non-empty \
1224                        topic {progress_topic}:{partition}, lo/hi: {lo}/{hi}, current position: {current_position}"
1225                );
1226            }
1227        };
1228
1229        if message.key() != Some(progress_key.to_bytes()) {
1230            // This is a progress message for a different sink.
1231            continue;
1232        }
1233
1234        metrics.consumed_progress_records.inc();
1235
1236        let Some(payload) = message.payload() else {
1237            continue;
1238        };
1239        let progress = parse_progress_record(payload)?;
1240
1241        match last_progress {
1242            Some(last_progress)
1243                if !PartialOrder::less_equal(&last_progress.frontier, &progress.frontier) =>
1244            {
1245                bail!(
1246                    "upper regressed in topic {progress_topic}:{partition} from {:?} to {:?}",
1247                    &last_progress.frontier,
1248                    &progress.frontier,
1249                );
1250            }
1251            _ => last_progress = Some(progress),
1252        }
1253    }
1254
1255    // If we get here, we are assured that we've read all messages up to
1256    // the high water mark, and therefore `last_timestamp` contains the
1257    // most recent timestamp for the sink under consideration.
1258    Ok(last_progress)
1259}
1260
1261/// This is the legacy struct that used to be emitted as part of a transactional produce and
1262/// contains the largest timestamp within the batch committed. Since it is just a timestamp it
1263/// cannot encode the fact that a sink has finished and deviates from upper frontier semantics.
1264/// Materialize no longer produces this record but it's possible that we encounter this in topics
1265/// written by older versions. In those cases we convert it into upper semantics by stepping the
1266/// timestamp forward.
1267#[derive(Debug, PartialEq, Serialize, Deserialize)]
1268pub struct LegacyProgressRecord {
1269    // Double Option to tell apart an omitted field from one set to null explicitly
1270    // https://github.com/serde-rs/serde/issues/984
1271    #[serde(default, deserialize_with = "deserialize_some")]
1272    pub timestamp: Option<Option<Timestamp>>,
1273}
1274
1275// Any value that is present is considered Some value, including null.
1276fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
1277where
1278    T: Deserialize<'de>,
1279    D: Deserializer<'de>,
1280{
1281    Deserialize::deserialize(deserializer).map(Some)
1282}
1283
1284/// This struct is emitted as part of a transactional produce, and contains the upper frontier of
1285/// the batch committed. It is used to recover the frontier a sink needs to resume at.
1286#[derive(Debug, PartialEq, Serialize, Deserialize)]
1287pub struct ProgressRecord {
1288    #[serde(
1289        deserialize_with = "deserialize_frontier",
1290        serialize_with = "serialize_frontier"
1291    )]
1292    pub frontier: Antichain<Timestamp>,
1293    #[serde(default)]
1294    pub version: u64,
1295}
1296fn serialize_frontier<S>(frontier: &Antichain<Timestamp>, serializer: S) -> Result<S::Ok, S::Error>
1297where
1298    S: Serializer,
1299{
1300    Serialize::serialize(frontier.elements(), serializer)
1301}
1302
1303fn deserialize_frontier<'de, D>(deserializer: D) -> Result<Antichain<Timestamp>, D::Error>
1304where
1305    D: Deserializer<'de>,
1306{
1307    let times: Vec<Timestamp> = Deserialize::deserialize(deserializer)?;
1308    Ok(Antichain::from(times))
1309}
1310
1311fn parse_progress_record(payload: &[u8]) -> Result<ProgressRecord, anyhow::Error> {
1312    Ok(match serde_json::from_slice::<ProgressRecord>(payload) {
1313        Ok(progress) => progress,
1314        // If we fail to deserialize we might be reading a legacy progress record
1315        Err(_) => match serde_json::from_slice::<LegacyProgressRecord>(payload) {
1316            Ok(LegacyProgressRecord {
1317                timestamp: Some(Some(time)),
1318            }) => ProgressRecord {
1319                frontier: Antichain::from_elem(time.step_forward()),
1320                version: 0,
1321            },
1322            Ok(LegacyProgressRecord {
1323                timestamp: Some(None),
1324            }) => ProgressRecord {
1325                frontier: Antichain::new(),
1326                version: 0,
1327            },
1328            _ => match std::str::from_utf8(payload) {
1329                Ok(payload) => bail!("invalid progress record: {payload}"),
1330                Err(_) => bail!("invalid progress record bytes: {payload:?}"),
1331            },
1332        },
1333    })
1334}
1335
1336/// Fetches the partition count for the identified topic.
1337async fn fetch_partition_count(
1338    producer: &ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1339    sink_id: GlobalId,
1340    topic_name: &str,
1341) -> Result<u64, anyhow::Error> {
1342    let meta = task::spawn_blocking(|| format!("kafka_sink_fetch_partition_count:{sink_id}"), {
1343        let producer = producer.clone();
1344        move || {
1345            producer
1346                .client()
1347                .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
1348        }
1349    })
1350    .await
1351    .check_ssh_status(producer.context())?;
1352
1353    match meta.topics().iter().find(|t| t.name() == topic_name) {
1354        Some(topic) => {
1355            let partition_count = u64::cast_from(topic.partitions().len());
1356            if partition_count == 0 {
1357                bail!("topic {topic_name} has an impossible partition count of zero");
1358            }
1359            Ok(partition_count)
1360        }
1361        None => bail!("topic {topic_name} does not exist"),
1362    }
1363}
1364
1365/// Fetches the partition count for the identified topic at the specified
1366/// interval.
1367///
1368/// When an updated partition count is discovered, invokes
1369/// `update_partition_count` with the new partition count.
1370async fn fetch_partition_count_loop<F>(
1371    producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1372    sink_id: GlobalId,
1373    topic_name: String,
1374    interval: Duration,
1375    update_partition_count: Arc<F>,
1376) where
1377    F: Fn(u64),
1378{
1379    let mut interval = time::interval(interval);
1380    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
1381    loop {
1382        interval.tick().await;
1383        match fetch_partition_count(&producer, sink_id, &topic_name).await {
1384            Ok(pc) => update_partition_count(pc),
1385            Err(e) => {
1386                warn!(%sink_id, "failed updating partition count: {e}");
1387                continue;
1388            }
1389        };
1390    }
1391}
1392
1393/// Walks each arrangement batch and emits encoded Kafka messages, one per
1394/// `DiffPair` observed at each `(key, timestamp)`.
1395///
1396/// When `key_is_synthetic`, the batch keys are per-row hashes used only for
1397/// worker distribution; the emitted `KafkaMessage` uses no key in that case.
1398fn encode_collection<'scope>(
1399    name: String,
1400    batches: SinkBatchStream<'scope>,
1401    envelope: SinkEnvelope,
1402    connection: KafkaSinkConnection,
1403    storage_configuration: StorageConfiguration,
1404    sink_id: GlobalId,
1405    from_id: GlobalId,
1406    key_is_synthetic: bool,
1407) -> (
1408    VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
1409    StreamVec<'scope, Timestamp, HealthStatusMessage>,
1410    PressOnDropButton,
1411) {
1412    let mut builder = AsyncOperatorBuilder::new(name, batches.scope());
1413
1414    let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1415    let mut input = builder.new_input_for(batches, Pipeline, &output);
1416
1417    let (button, errors) = builder.build_fallible(move |caps| {
1418        Box::pin(async move {
1419            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1420            let key_desc = connection
1421                .key_desc_and_indices
1422                .as_ref()
1423                .map(|(desc, _indices)| desc.clone());
1424            let value_desc = connection.value_desc;
1425
1426            let key_encoder: Option<Box<dyn Encode>> =
1427                match (key_desc, connection.format.key_format) {
1428                    (Some(desc), Some(KafkaSinkFormatType::Bytes)) => {
1429                        Some(Box::new(BinaryEncoder::new(desc, false)))
1430                    }
1431                    (Some(desc), Some(KafkaSinkFormatType::Text)) => {
1432                        Some(Box::new(TextEncoder::new(desc, false)))
1433                    }
1434                    (Some(desc), Some(KafkaSinkFormatType::Json)) => {
1435                        Some(Box::new(JsonEncoder::new(desc, false)))
1436                    }
1437                    (Some(desc), Some(KafkaSinkFormatType::Avro {
1438                        schema,
1439                        compatibility_level,
1440                        csr_connection,
1441                    })) => {
1442                        // Ensure that schemas are registered with the schema registry.
1443                        //
1444                        // Note that where this lies in the rendering cycle means that we will publish the
1445                        // schemas each time the sink is rendered.
1446                        let ccsr = csr_connection
1447                            .connect(&storage_configuration, InTask::Yes)
1448                            .await?;
1449
1450                        let schema_id = mz_storage_client::sink::publish_kafka_schema(
1451                            ccsr,
1452                            format!("{}-key", connection.topic),
1453                            schema.clone(),
1454                            mz_ccsr::SchemaType::Avro,
1455                            compatibility_level,
1456                        )
1457                        .await
1458                        .context("error publishing kafka schemas for sink")?;
1459
1460                        Some(Box::new(AvroEncoder::new(desc, false, &schema, schema_id)))
1461                    }
1462                    (None, None) => None,
1463                    (desc, format) => {
1464                        return Err(anyhow!(
1465                            "key_desc and key_format must be both set or both unset, but key_desc: {:?}, key_format: {:?}",
1466                            desc,
1467                            format
1468                        ))
1469                    }
1470                };
1471
1472            // whether to apply the debezium envelope to the value encoding
1473            let debezium = matches!(envelope, SinkEnvelope::Debezium);
1474
1475            let value_encoder: Box<dyn Encode> = match connection.format.value_format {
1476                KafkaSinkFormatType::Bytes => Box::new(BinaryEncoder::new(value_desc, debezium)),
1477                KafkaSinkFormatType::Text => Box::new(TextEncoder::new(value_desc, debezium)),
1478                KafkaSinkFormatType::Json => Box::new(JsonEncoder::new(value_desc, debezium)),
1479                KafkaSinkFormatType::Avro {
1480                    schema,
1481                    compatibility_level,
1482                    csr_connection,
1483                } => {
1484                    // Ensure that schemas are registered with the schema registry.
1485                    //
1486                    // Note that where this lies in the rendering cycle means that we will publish the
1487                    // schemas each time the sink is rendered.
1488                    let ccsr = csr_connection
1489                        .connect(&storage_configuration, InTask::Yes)
1490                        .await?;
1491
1492                    let schema_id = mz_storage_client::sink::publish_kafka_schema(
1493                        ccsr,
1494                        format!("{}-value", connection.topic),
1495                        schema.clone(),
1496                        mz_ccsr::SchemaType::Avro,
1497                        compatibility_level,
1498                    )
1499                    .await
1500                    .context("error publishing kafka schemas for sink")?;
1501
1502                    Box::new(AvroEncoder::new(value_desc, debezium, &schema, schema_id))
1503                }
1504            };
1505
1506            // !IMPORTANT!
1507            // Correctness of this operator relies on no fallible operations happening after this
1508            // point. This is a temporary workaround of build_fallible's bad interaction of owned
1509            // capabilities and errors.
1510            // TODO(petrosagg): Make the fallible async operator safe
1511            *capset = CapabilitySet::new();
1512
1513            let mut row_buf = Row::default();
1514            let mut datums = DatumVec::new();
1515            let mut pk_warner =
1516                (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id));
1517
1518            while let Some(event) = input.next().await {
1519                if let Event::Data(cap, mut batches) = event {
1520                    for batch in batches.drain(..) {
1521                        for_each_diff_pair(&batch, |key, time, value| {
1522                            if let Some(warner) = pk_warner.as_mut() {
1523                                warner.observe(key, time);
1524                            }
1525                            // Only emit the arrangement key when the user configured one; relation-key
1526                            // and synthetic-hash arrangements exist purely for grouping / worker
1527                            // distribution and have no corresponding key encoder.
1528                            let key_for_message = if key_encoder.is_some() { key } else { &None };
1529
1530                            let mut hash = None;
1531                            let mut headers = vec![];
1532                            if connection.headers_index.is_some()
1533                                || connection.partition_by.is_some()
1534                            {
1535                                // Header values and partition by values are derived from the row
1536                                // that produces an event. But it is ambiguous whether to use the
1537                                // `before` or `after` from the event. The rule applied here is
1538                                // simple: use `after` if it exists (insertions and updates),
1539                                // otherwise fall back to `before` (deletions).
1540                                //
1541                                // It is up to the SQL planner to ensure this produces sensible
1542                                // results. (When using the upsert envelope and both `before` and
1543                                // `after` are present, it's always unambiguous to use `after`
1544                                // because that's all that will be present in the Kafka message;
1545                                // when using the Debezium envelope, it's okay to refer to columns
1546                                // in the key because those are guaranteed to be the same in both
1547                                // `before` and `after`.)
1548                                let row = value
1549                                    .after
1550                                    .as_ref()
1551                                    .or(value.before.as_ref())
1552                                    .expect("one of before or after must be set");
1553                                let row = datums.borrow_with(row);
1554
1555                                if let Some(i) = connection.headers_index {
1556                                    headers = encode_headers(row[i]);
1557                                }
1558
1559                                if let Some(partition_by) = &connection.partition_by {
1560                                    hash = Some(evaluate_partition_by(partition_by, &row));
1561                                }
1562                            }
1563                            let (encoded_key, hash) = match key_for_message {
1564                                Some(key) => {
1565                                    let key_encoder =
1566                                        key_encoder.as_ref().expect("key present");
1567                                    let encoded = key_encoder.encode_unchecked(key.clone());
1568                                    let hash =
1569                                        hash.unwrap_or_else(|| key_encoder.hash(&encoded));
1570                                    (Some(encoded), hash)
1571                                }
1572                                None => (None, hash.unwrap_or(0)),
1573                            };
1574                            let value = match envelope {
1575                                SinkEnvelope::Upsert => value.after,
1576                                SinkEnvelope::Debezium => {
1577                                    dbz_format(&mut row_buf.packer(), value);
1578                                    Some(row_buf.clone())
1579                                }
1580                                SinkEnvelope::Append => {
1581                                    unreachable!("Append envelope is not valid for Kafka sinks")
1582                                }
1583                            };
1584                            let value = value.map(|value| value_encoder.encode_unchecked(value));
1585                            let message = KafkaMessage {
1586                                hash,
1587                                key: encoded_key,
1588                                value,
1589                                headers,
1590                            };
1591                            output.give(&cap, (message, time, Diff::ONE));
1592                        });
1593                        // Flush after each batch so the final `(key, time)` group of the walk is
1594                        // resolved immediately — a PK violation in the last group is otherwise
1595                        // held until more data arrives or the operator shuts down.
1596                        if let Some(warner) = pk_warner.as_mut() {
1597                            warner.flush();
1598                        }
1599                    }
1600                }
1601            }
1602
1603            Ok::<(), anyhow::Error>(())
1604        })
1605    });
1606
1607    let statuses = errors.map(|error| HealthStatusMessage {
1608        id: None,
1609        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1610        namespace: StatusNamespace::Kafka,
1611    });
1612
1613    (stream.as_collection(), statuses, button.press_on_drop())
1614}
1615
1616fn encode_headers(datum: Datum) -> Vec<KafkaHeader> {
1617    let mut out = vec![];
1618    if datum.is_null() {
1619        return out;
1620    }
1621    for (key, value) in datum.unwrap_map().iter() {
1622        out.push(KafkaHeader {
1623            key: key.into(),
1624            value: match value {
1625                Datum::Null => None,
1626                Datum::String(s) => Some(s.as_bytes().to_vec()),
1627                Datum::Bytes(b) => Some(b.to_vec()),
1628                _ => panic!("encode_headers called with unexpected header value {value:?}"),
1629            },
1630        })
1631    }
1632    out
1633}
1634
1635/// Evaluates a partition by expression on the given row, returning the hash
1636/// value to use for partition assignment.
1637///
1638/// The provided expression must have type `Int32`, `Int64`, `UInt32`, or
1639/// `UInt64`. If the expression produces an error when evaluated, or if the
1640/// expression is of a signed integer type and produces a negative value, this
1641/// function returns 0.
1642fn evaluate_partition_by(partition_by: &MirScalarExpr, row: &[Datum]) -> u64 {
1643    // NOTE(benesch): The way this function converts errors and invalid values
1644    // to 0 is somewhat surpising. Ideally, we would put the sink in a
1645    // permanently errored state if the partition by expression produces an
1646    // error or invalid value. But we don't presently have a way for sinks to
1647    // report errors (see materialize#17688), so the current behavior was determined to be
1648    // the best available option. The behavior is clearly documented in the
1649    // user-facing `CREATE SINK` docs.
1650    let temp_storage = RowArena::new();
1651    match partition_by.eval(row, &temp_storage) {
1652        Ok(Datum::UInt64(u)) => u,
1653        Ok(datum) => {
1654            // If we are here the only valid type that we should be seeing is
1655            // null. Anything else is a bug in the planner.
1656            soft_assert_or_log!(datum.is_null(), "unexpected partition_by result: {datum:?}");
1657            // We treat nulls the same as we treat errors: map them to partition 0.
1658            0
1659        }
1660        Err(_) => 0,
1661    }
1662}
1663
1664#[cfg(test)]
1665mod test {
1666    use mz_ore::assert_err;
1667
1668    use super::*;
1669
1670    #[mz_ore::test]
1671    fn progress_record_migration() {
1672        assert_err!(parse_progress_record(b"{}"));
1673
1674        assert_eq!(
1675            parse_progress_record(b"{\"timestamp\":1}").unwrap(),
1676            ProgressRecord {
1677                frontier: Antichain::from_elem(2.into()),
1678                version: 0,
1679            }
1680        );
1681
1682        assert_eq!(
1683            parse_progress_record(b"{\"timestamp\":null}").unwrap(),
1684            ProgressRecord {
1685                frontier: Antichain::new(),
1686                version: 0,
1687            }
1688        );
1689
1690        assert_eq!(
1691            parse_progress_record(b"{\"frontier\":[1]}").unwrap(),
1692            ProgressRecord {
1693                frontier: Antichain::from_elem(1.into()),
1694                version: 0,
1695            }
1696        );
1697
1698        assert_eq!(
1699            parse_progress_record(b"{\"frontier\":[]}").unwrap(),
1700            ProgressRecord {
1701                frontier: Antichain::new(),
1702                version: 0,
1703            }
1704        );
1705
1706        assert_eq!(
1707            parse_progress_record(b"{\"frontier\":[], \"version\": 42}").unwrap(),
1708            ProgressRecord {
1709                frontier: Antichain::new(),
1710                version: 42,
1711            }
1712        );
1713
1714        assert_err!(parse_progress_record(b"{\"frontier\":null}"));
1715    }
1716}