Skip to main content

mz_storage/sink/
kafka.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the sink dataflow of a [`KafkaSinkConnection`]. The dataflow consists
11//! of two operators in order to take advantage of all the available workers.
12//!
13//! ```text
14//!        ┏━━━━━━━━━━━━━━┓
15//!        ┃   persist    ┃
16//!        ┃    source    ┃
17//!        ┗━━━━━━┯━━━━━━━┛
18//!               │ row data, the input to this module
19//!               │
20//!        ┏━━━━━━v━━━━━━┓
21//!        ┃    row      ┃
22//!        ┃   encoder   ┃
23//!        ┗━━━━━━┯━━━━━━┛
24//!               │ encoded data
25//!               │
26//!        ┏━━━━━━v━━━━━━┓
27//!        ┃    kafka    ┃ (single worker)
28//!        ┃    sink     ┃
29//!        ┗━━┯━━━━━━━━┯━┛
30//!   records │        │ uppers
31//!      ╭────v──╮ ╭───v──────╮
32//!      │ data  │ │ progress │  <- records and uppers are produced
33//!      │ topic │ │  topic   │     transactionally to both topics
34//!      ╰───────╯ ╰──────────╯
35//! ```
36//!
37//! # Encoding
38//!
39//! One part of the dataflow deals with encoding the rows that we read from persist. There isn't
40//! anything surprizing here, it is *almost* just a `Collection::map` with the exception of an
41//! initialization step that makes sure the schemas are published to the Schema Registry. After
42//! that step the operator just encodes each batch it receives record by record.
43//!
44//! # Sinking
45//!
46//! The other part of the dataflow, and what this module mostly deals with, is interacting with the
47//! Kafka cluster in order to transactionally commit batches (sets of records associated with a
48//! frontier). All the processing happens in a single worker and so all previously encoded records
49//! go through an exchange in order to arrive at the chosen worker. We may be able to improve this
50//! in the future by committing disjoint partitions of the key space for independent workers but
51//! for now we do the simple thing.
52//!
53//! ## Retries
54//!
55//! All of the retry logic heavy lifting is offloaded to `librdkafka` since it already implements
56//! the required behavior[1]. In particular we only ever enqueue records to its send queue and
57//! eventually call `commit_transaction` which will ensure that all queued messages are
58//! successfully delivered before the transaction is reported as committed.
59//!
60//! The only error that is possible during sending is that the queue is full. We are purposefully
61//! NOT handling this error and simply configure `librdkafka` with a very large queue. The reason
62//! for this choice is that the only choice for hanlding such an error ourselves would be to queue
63//! it, and there isn't a good argument about two small queues being better than one big one. If we
64//! reach the queue limit we simply error out the entire sink dataflow and start over.
65//!
66//! # Error handling
67//!
68//! Both the encoding operator and the sinking operator can produce a transient error that is wired
69//! up with our health monitoring and will trigger a restart of the sink dataflow.
70//!
71//! [1]: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#message-reliability
72
73use std::cell::RefCell;
74use std::cmp::Ordering;
75use std::collections::BTreeMap;
76use std::future::Future;
77use std::rc::Rc;
78use std::sync::atomic::AtomicU64;
79use std::sync::{Arc, Weak};
80use std::time::Duration;
81
82use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
83use crate::metrics::sink::kafka::KafkaSinkMetrics;
84use crate::render::sinks::SinkRender;
85use crate::statistics::SinkStatistics;
86use crate::storage_state::StorageState;
87use anyhow::{Context, anyhow, bail};
88use differential_dataflow::{AsCollection, Hashable, VecCollection};
89use futures::StreamExt;
90use maplit::btreemap;
91use mz_expr::MirScalarExpr;
92use mz_interchange::avro::{AvroEncoder, DiffPair};
93use mz_interchange::encode::Encode;
94use mz_interchange::envelopes::dbz_format;
95use mz_interchange::json::JsonEncoder;
96use mz_interchange::text_binary::{BinaryEncoder, TextEncoder};
97use mz_kafka_util::admin::EnsureTopicConfig;
98use mz_kafka_util::client::{
99    DEFAULT_FETCH_METADATA_TIMEOUT, GetPartitionsError, MzClientContext, TimeoutConfig,
100    TunnelingClientContext,
101};
102use mz_ore::cast::CastFrom;
103use mz_ore::collections::CollectionExt;
104use mz_ore::error::ErrorExt;
105use mz_ore::future::InTask;
106use mz_ore::soft_assert_or_log;
107use mz_ore::task::{self, AbortOnDropHandle};
108use mz_persist_client::Diagnostics;
109use mz_persist_client::write::WriteHandle;
110use mz_persist_types::codec_impls::UnitSchema;
111use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
112use mz_storage_client::sink::progress_key::ProgressKey;
113use mz_storage_types::StorageDiff;
114use mz_storage_types::configuration::StorageConfiguration;
115use mz_storage_types::controller::CollectionMetadata;
116use mz_storage_types::dyncfgs::{
117    KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS, SINK_ENSURE_TOPIC_CONFIG, SINK_PROGRESS_SEARCH,
118};
119use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
120use mz_storage_types::sinks::{
121    KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, StorageSinkDesc,
122};
123use mz_storage_types::sources::SourceData;
124use mz_timely_util::antichain::AntichainExt;
125use mz_timely_util::builder_async::{
126    Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
127};
128use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
129use rdkafka::error::KafkaError;
130use rdkafka::message::{Header, OwnedHeaders, ToBytes};
131use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
132use rdkafka::types::RDKafkaErrorCode;
133use rdkafka::{Message, Offset, Statistics, TopicPartitionList};
134use serde::{Deserialize, Deserializer, Serialize, Serializer};
135use timely::PartialOrder;
136use timely::container::CapacityContainerBuilder;
137use timely::dataflow::StreamVec;
138use timely::dataflow::channels::pact::{Exchange, Pipeline};
139use timely::dataflow::operators::vec::{Map, ToStream};
140use timely::dataflow::operators::{CapabilitySet, Concatenate};
141use timely::progress::{Antichain, Timestamp as _};
142use tokio::sync::watch;
143use tokio::time::{self, MissedTickBehavior};
144use tracing::{debug, error, info, warn};
145
146impl<'scope> SinkRender<'scope> for KafkaSinkConnection {
147    fn get_key_indices(&self) -> Option<&[usize]> {
148        self.key_desc_and_indices
149            .as_ref()
150            .map(|(_desc, indices)| indices.as_slice())
151    }
152
153    fn get_relation_key_indices(&self) -> Option<&[usize]> {
154        self.relation_key_indices.as_deref()
155    }
156
157    fn render_sink(
158        &self,
159        storage_state: &mut StorageState,
160        sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
161        sink_id: GlobalId,
162        input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
163        // TODO(benesch): errors should stream out through the sink,
164        // if we figure out a protocol for that.
165        _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
166    ) -> (
167        StreamVec<'scope, Timestamp, HealthStatusMessage>,
168        Vec<PressOnDropButton>,
169    ) {
170        let scope = input.scope();
171
172        let write_handle = {
173            let persist = Arc::clone(&storage_state.persist_clients);
174            let shard_meta = sink.to_storage_metadata.clone();
175            async move {
176                let client = persist.open(shard_meta.persist_location).await?;
177                let handle = client
178                    .open_writer(
179                        shard_meta.data_shard,
180                        Arc::new(shard_meta.relation_desc),
181                        Arc::new(UnitSchema),
182                        Diagnostics::from_purpose("sink handle"),
183                    )
184                    .await?;
185                Ok(handle)
186            }
187        };
188
189        let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
190        storage_state
191            .sink_write_frontiers
192            .insert(sink_id, Rc::clone(&write_frontier));
193
194        let (encoded, encode_status, encode_token) = encode_collection(
195            format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
196            input,
197            sink.envelope,
198            self.clone(),
199            storage_state.storage_configuration.clone(),
200        );
201
202        let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id);
203        let statistics = storage_state
204            .aggregated_statistics
205            .get_sink(&sink_id)
206            .expect("statistics initialized")
207            .clone();
208
209        let (sink_status, sink_token) = sink_collection(
210            format!("kafka-{sink_id}-sink"),
211            encoded,
212            sink_id,
213            self.clone(),
214            storage_state.storage_configuration.clone(),
215            sink,
216            metrics,
217            statistics,
218            write_handle,
219            write_frontier,
220        );
221
222        let running_status = Some(HealthStatusMessage {
223            id: None,
224            update: HealthStatusUpdate::Running,
225            namespace: StatusNamespace::Kafka,
226        })
227        .to_stream(scope);
228
229        let status = scope.concatenate([running_status, encode_status, sink_status]);
230
231        (status, vec![encode_token, sink_token])
232    }
233}
234
235struct TransactionalProducer {
236    /// The task name used for any blocking calls spawned onto the tokio threadpool.
237    task_name: String,
238    /// The topic where all the updates go.
239    data_topic: String,
240    /// The topic where all the upper frontiers go.
241    progress_topic: String,
242    /// The key each progress record is associated with.
243    progress_key: ProgressKey,
244    /// The version of this sink, used to fence out previous versions from writing.
245    sink_version: u64,
246    /// The number of partitions in the target topic.
247    partition_count: Arc<AtomicU64>,
248    /// A task to periodically refresh the partition count.
249    _partition_count_task: AbortOnDropHandle<()>,
250    /// The underlying Kafka producer.
251    producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
252    /// A handle to the metrics associated with this sink.
253    statistics: SinkStatistics,
254    /// The number of messages staged for the currently open transactions. It is reset to zero
255    /// every time a transaction commits.
256    staged_messages: u64,
257    /// The total number bytes staged for the currently open transactions. It is reset to zero
258    /// every time a transaction commits.
259    staged_bytes: u64,
260    /// The timeout to use for network operations.
261    socket_timeout: Duration,
262    /// The timeout to use for committing transactions.
263    transaction_timeout: Duration,
264}
265
266impl TransactionalProducer {
267    /// Initializes a transcational producer for the sink identified by `sink_id`. After this call
268    /// returns it is guranteed that all previous `TransactionalProducer` instances for the same
269    /// sink have been fenced out (i.e `init_transations()` has been called successfully).
270    async fn new(
271        sink_id: GlobalId,
272        connection: &KafkaSinkConnection,
273        storage_configuration: &StorageConfiguration,
274        metrics: Arc<KafkaSinkMetrics>,
275        statistics: SinkStatistics,
276        sink_version: u64,
277    ) -> Result<(Self, Antichain<mz_repr::Timestamp>), ContextCreationError> {
278        let client_id = connection.client_id(
279            storage_configuration.config_set(),
280            &storage_configuration.connection_context,
281            sink_id,
282        );
283        let transactional_id =
284            connection.transactional_id(&storage_configuration.connection_context, sink_id);
285
286        let timeout_config = &storage_configuration.parameters.kafka_timeout_config;
287        let mut options = BTreeMap::new();
288        // Ensure that messages are sinked in order and without duplicates. Note that this only
289        // applies to a single instance of a producer - in the case of restarts, all bets are off
290        // and full exactly once support is required.
291        options.insert("enable.idempotence", "true".into());
292        // Use the compression type requested by the user.
293        options.insert(
294            "compression.type",
295            connection.compression_type.to_librdkafka_option().into(),
296        );
297        // Set the maximum buffer size limit. We don't want to impose anything lower than the max
298        // here as the operator has nothing better to do with the data than to buffer them.
299        options.insert("queue.buffering.max.kbytes", "2147483647".into());
300        // Disable the default buffer limit of 100k messages. We don't want to impose any limit
301        // here as the operator has nothing better to do with the data than to buffer them.
302        options.insert("queue.buffering.max.messages", "0".into());
303        // Make the Kafka producer wait at least 10 ms before sending out MessageSets
304        options.insert("queue.buffering.max.ms", format!("{}", 10));
305        // Time out transactions after 60 seconds
306        options.insert(
307            "transaction.timeout.ms",
308            format!("{}", timeout_config.transaction_timeout.as_millis()),
309        );
310        // Use the transactional ID requested by the user.
311        options.insert("transactional.id", transactional_id);
312        // Allow Kafka monitoring tools to identify this producer.
313        options.insert("client.id", client_id);
314        // We want to be notified regularly with statistics
315        options.insert("statistics.interval.ms", "1000".into());
316
317        let ctx = MzClientContext::default();
318
319        let stats_receiver = ctx.subscribe_statistics();
320        let task_name = format!("kafka_sink_metrics_collector:{sink_id}");
321        task::spawn(
322            || &task_name,
323            collect_statistics(stats_receiver, Arc::clone(&metrics)),
324        );
325
326        let producer: ThreadedProducer<_> = connection
327            .connection
328            .create_with_context(storage_configuration, ctx, &options, InTask::Yes)
329            .await?;
330
331        // The partition count is fixed up after we ensure the topic exists.
332        let partition_count = Arc::new(AtomicU64::new(0));
333        let update_partition_count = {
334            let partition_count = Arc::clone(&partition_count);
335            let metrics = Arc::clone(&metrics);
336            Arc::new(move |pc| {
337                partition_count.store(pc, std::sync::atomic::Ordering::SeqCst);
338                metrics.partition_count.set(pc);
339            })
340        };
341
342        // Start a task that will keep the partition count up to date in the
343        // background.
344        let partition_count_task = task::spawn(
345            || format!("kafka_sink_producer_fetch_metadata_loop:{sink_id}"),
346            fetch_partition_count_loop(
347                producer.clone(),
348                sink_id,
349                connection.topic.clone(),
350                connection.topic_metadata_refresh_interval,
351                Arc::clone(&update_partition_count),
352            ),
353        );
354
355        let task_name = format!("kafka_sink_producer:{sink_id}");
356        let progress_key = ProgressKey::new(sink_id);
357
358        let producer = Self {
359            task_name,
360            data_topic: connection.topic.clone(),
361            partition_count,
362            _partition_count_task: partition_count_task.abort_on_drop(),
363            progress_topic: connection
364                .progress_topic(&storage_configuration.connection_context)
365                .into_owned(),
366            progress_key,
367            sink_version,
368            producer,
369            statistics,
370            staged_messages: 0,
371            staged_bytes: 0,
372            socket_timeout: timeout_config.socket_timeout,
373            transaction_timeout: timeout_config.transaction_timeout,
374        };
375
376        let timeout = timeout_config.socket_timeout;
377        producer
378            .spawn_blocking(move |p| p.init_transactions(timeout))
379            .await?;
380
381        // We have just called init_transactions, which means that we have fenced out all previous
382        // transactional producers, making it safe to determine the resume upper.
383        let progress = determine_sink_progress(
384            sink_id,
385            connection,
386            storage_configuration,
387            Arc::clone(&metrics),
388        )
389        .await?;
390
391        let resume_upper = match progress {
392            Some(progress) => {
393                if sink_version < progress.version {
394                    return Err(ContextCreationError::Other(anyhow!(
395                        "Fenced off by newer version of the sink. ours={} theirs={}",
396                        sink_version,
397                        progress.version
398                    )));
399                }
400                progress.frontier
401            }
402            None => {
403                mz_storage_client::sink::ensure_kafka_topic(
404                    connection,
405                    storage_configuration,
406                    &connection.topic,
407                    &connection.topic_options,
408                    EnsureTopicConfig::Skip,
409                )
410                .await?;
411                Antichain::from_elem(Timestamp::minimum())
412            }
413        };
414
415        // At this point the topic must exist and so we can query for its
416        // partition count. Even though we have a background task to fetch the
417        // partition count, we do this synchronously to ensure we don't attempt
418        // to produce any messages with our initial partition count of 0.
419        let partition_count =
420            fetch_partition_count(&producer.producer, sink_id, &connection.topic).await?;
421        update_partition_count(partition_count);
422
423        Ok((producer, resume_upper))
424    }
425
426    /// Runs the blocking operation `f` on the producer in the tokio threadpool and checks for SSH
427    /// status in case of failure.
428    async fn spawn_blocking<F, R>(&self, f: F) -> Result<R, ContextCreationError>
429    where
430        F: FnOnce(
431                ThreadedProducer<TunnelingClientContext<MzClientContext>>,
432            ) -> Result<R, KafkaError>
433            + Send
434            + 'static,
435        R: Send + 'static,
436    {
437        let producer = self.producer.clone();
438        task::spawn_blocking(|| &self.task_name, move || f(producer))
439            .await
440            .check_ssh_status(self.producer.context())
441    }
442
443    async fn begin_transaction(&self) -> Result<(), ContextCreationError> {
444        self.spawn_blocking(|p| p.begin_transaction()).await
445    }
446
447    /// Synchronously puts the provided message to librdkafka's send queue. This method only
448    /// returns an error if the queue is full. Handling this error by buffering the message and
449    /// retrying is equivalent to adjusting the maximum number of queued items in rdkafka so it is
450    /// adviced that callers only handle this error in order to apply backpressure to the rest of
451    /// the system.
452    fn send(
453        &mut self,
454        message: &KafkaMessage,
455        time: Timestamp,
456        diff: Diff,
457    ) -> Result<(), KafkaError> {
458        assert_eq!(diff, Diff::ONE, "invalid sink update");
459
460        let mut headers = OwnedHeaders::new().insert(Header {
461            key: "materialize-timestamp",
462            value: Some(time.to_string().as_bytes()),
463        });
464        for header in &message.headers {
465            // Headers that start with `materialize-` are reserved for our
466            // internal use, so we silently drop any such user-specified
467            // headers. While this behavior is documented, it'd be a nicer UX to
468            // send a warning or error somewhere. Unfortunately sinks don't have
469            // anywhere user-visible to send errors. See database-issues#5148.
470            if header.key.starts_with("materialize-") {
471                continue;
472            }
473
474            headers = headers.insert(Header {
475                key: header.key.as_str(),
476                value: header.value.as_ref(),
477            });
478        }
479
480        let pc = self
481            .partition_count
482            .load(std::sync::atomic::Ordering::SeqCst);
483        let partition = Some(i32::try_from(message.hash % pc).unwrap());
484
485        let record = BaseRecord {
486            topic: &self.data_topic,
487            key: message.key.as_ref(),
488            payload: message.value.as_ref(),
489            headers: Some(headers),
490            partition,
491            timestamp: None,
492            delivery_opaque: (),
493        };
494        let key_size = message.key.as_ref().map(|k| k.len()).unwrap_or(0);
495        let value_size = message.value.as_ref().map(|k| k.len()).unwrap_or(0);
496        let headers_size = message
497            .headers
498            .iter()
499            .map(|h| h.key.len() + h.value.as_ref().map(|v| v.len()).unwrap_or(0))
500            .sum::<usize>();
501        let record_size = u64::cast_from(key_size + value_size + headers_size);
502        self.statistics.inc_messages_staged_by(1);
503        self.staged_messages += 1;
504        self.statistics.inc_bytes_staged_by(record_size);
505        self.staged_bytes += record_size;
506        self.producer.send(record).map_err(|(e, _)| e)
507    }
508
509    /// Commits all the staged updates of the currently open transaction plus a progress record
510    /// describing `upper` to the progress topic.
511    async fn commit_transaction(
512        &mut self,
513        upper: Antichain<Timestamp>,
514    ) -> Result<(), ContextCreationError> {
515        let progress = ProgressRecord {
516            frontier: upper,
517            version: self.sink_version,
518        };
519        let payload = serde_json::to_vec(&progress).expect("infallible");
520        let record = BaseRecord::to(&self.progress_topic)
521            .payload(&payload)
522            .key(&self.progress_key);
523        self.producer.send(record).map_err(|(e, _)| e)?;
524
525        fail::fail_point!("kafka_sink_commit_transaction");
526
527        let timeout = self.transaction_timeout;
528        match self
529            .spawn_blocking(move |p| p.commit_transaction(timeout))
530            .await
531        {
532            Ok(()) => {
533                self.statistics
534                    .inc_messages_committed_by(self.staged_messages);
535                self.statistics.inc_bytes_committed_by(self.staged_bytes);
536                self.staged_messages = 0;
537                self.staged_bytes = 0;
538                Ok(())
539            }
540            Err(ContextCreationError::KafkaError(KafkaError::Transaction(err))) => {
541                // Make one attempt at aborting the transaction before letting the error percolate
542                // up and the process exit. Aborting allows the consumers of the topic to skip over
543                // any messages we've written in the transaction, so it's polite to do... but if it
544                // fails, the transaction will be aborted either when fenced out by a future
545                // version of this producer or by the broker-side timeout.
546                if err.txn_requires_abort() {
547                    let timeout = self.socket_timeout;
548                    self.spawn_blocking(move |p| p.abort_transaction(timeout))
549                        .await?;
550                }
551                Err(ContextCreationError::KafkaError(KafkaError::Transaction(
552                    err,
553                )))
554            }
555            Err(err) => Err(err),
556        }
557    }
558}
559
560/// Listens for statistics updates from librdkafka and updates our Prometheus metrics.
561async fn collect_statistics(
562    mut receiver: watch::Receiver<Statistics>,
563    metrics: Arc<KafkaSinkMetrics>,
564) {
565    let mut outbuf_cnt: i64 = 0;
566    let mut outbuf_msg_cnt: i64 = 0;
567    let mut waitresp_cnt: i64 = 0;
568    let mut waitresp_msg_cnt: i64 = 0;
569    let mut txerrs: u64 = 0;
570    let mut txretries: u64 = 0;
571    let mut req_timeouts: u64 = 0;
572    let mut connects: i64 = 0;
573    let mut disconnects: i64 = 0;
574    while receiver.changed().await.is_ok() {
575        let stats = receiver.borrow();
576        for broker in stats.brokers.values() {
577            outbuf_cnt += broker.outbuf_cnt;
578            outbuf_msg_cnt += broker.outbuf_msg_cnt;
579            waitresp_cnt += broker.waitresp_cnt;
580            waitresp_msg_cnt += broker.waitresp_msg_cnt;
581            txerrs += broker.txerrs;
582            txretries += broker.txretries;
583            req_timeouts += broker.req_timeouts;
584            connects += broker.connects.unwrap_or(0);
585            disconnects += broker.disconnects.unwrap_or(0);
586        }
587        metrics.rdkafka_msg_cnt.set(stats.msg_cnt);
588        metrics.rdkafka_msg_size.set(stats.msg_size);
589        metrics.rdkafka_txmsgs.set(stats.txmsgs);
590        metrics.rdkafka_txmsg_bytes.set(stats.txmsg_bytes);
591        metrics.rdkafka_tx.set(stats.tx);
592        metrics.rdkafka_tx_bytes.set(stats.tx_bytes);
593        metrics.rdkafka_outbuf_cnt.set(outbuf_cnt);
594        metrics.rdkafka_outbuf_msg_cnt.set(outbuf_msg_cnt);
595        metrics.rdkafka_waitresp_cnt.set(waitresp_cnt);
596        metrics.rdkafka_waitresp_msg_cnt.set(waitresp_msg_cnt);
597        metrics.rdkafka_txerrs.set(txerrs);
598        metrics.rdkafka_txretries.set(txretries);
599        metrics.rdkafka_req_timeouts.set(req_timeouts);
600        metrics.rdkafka_connects.set(connects);
601        metrics.rdkafka_disconnects.set(disconnects);
602    }
603}
604
605/// A message to produce to Kafka.
606#[derive(Debug, Clone, Serialize, Deserialize)]
607struct KafkaMessage {
608    /// A hash of the key that can be used for partitioning.
609    hash: u64,
610    /// The message key.
611    key: Option<Vec<u8>>,
612    /// The message value.
613    value: Option<Vec<u8>>,
614    /// Message headers.
615    headers: Vec<KafkaHeader>,
616}
617
618/// A header to attach to a Kafka message.
619#[derive(Debug, Clone, Serialize, Deserialize)]
620struct KafkaHeader {
621    /// The header key.
622    key: String,
623    /// The header value.
624    value: Option<Vec<u8>>,
625}
626
627/// Sinks a collection of encoded rows to Kafka.
628///
629/// This operator exchanges all updates to a single worker by hashing on the given sink `id`.
630///
631/// Updates are sent in ascending timestamp order.
632fn sink_collection<'scope>(
633    name: String,
634    input: VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
635    sink_id: GlobalId,
636    connection: KafkaSinkConnection,
637    storage_configuration: StorageConfiguration,
638    sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
639    metrics: KafkaSinkMetrics,
640    statistics: SinkStatistics,
641    write_handle: impl Future<
642        Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
643    > + 'static,
644    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
645) -> (
646    StreamVec<'scope, Timestamp, HealthStatusMessage>,
647    PressOnDropButton,
648) {
649    let scope = input.scope();
650    let mut builder = AsyncOperatorBuilder::new(name.clone(), input.inner.scope());
651
652    // We want exactly one worker to send all the data to the sink topic.
653    let hashed_id = sink_id.hashed();
654    let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
655    let buffer_min_capacity =
656        KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS.handle(storage_configuration.config_set());
657
658    let mut input = builder.new_disconnected_input(input.inner, Exchange::new(move |_| hashed_id));
659
660    let as_of = sink.as_of.clone();
661    let sink_version = sink.version;
662    let (button, errors) = builder.build_fallible(move |_caps| {
663        Box::pin(async move {
664            if !is_active_worker {
665                write_frontier.borrow_mut().clear();
666                return Ok(());
667            }
668
669            fail::fail_point!("kafka_sink_creation_error", |_| Err(
670                ContextCreationError::Other(anyhow::anyhow!("synthetic error"))
671            ));
672
673            let mut write_handle = write_handle.await?;
674
675            let metrics = Arc::new(metrics);
676
677            let (mut producer, resume_upper) = TransactionalProducer::new(
678                sink_id,
679                &connection,
680                &storage_configuration,
681                Arc::clone(&metrics),
682                statistics,
683                sink_version,
684            )
685            .await?;
686
687            // The input has overcompacted if
688            let overcompacted =
689                // ..we have made some progress in the past
690                *resume_upper != [Timestamp::minimum()] &&
691                // ..but the since frontier is now beyond that
692                !PartialOrder::less_equal(&as_of, &resume_upper);
693            if overcompacted {
694                let err = format!(
695                    "{name}: input compacted past resume upper: as_of {}, resume_upper: {}",
696                    as_of.pretty(),
697                    resume_upper.pretty()
698                );
699                // This would normally be an assertion but because it can happen after a
700                // Materialize backup/restore we log an error so that it appears on Sentry but
701                // leaves the rest of the objects in the cluster unaffected.
702                error!("{err}");
703                return Err(anyhow!("{err}").into());
704            }
705
706            info!(
707                "{name}: as_of: {}, resume upper: {}",
708                as_of.pretty(),
709                resume_upper.pretty()
710            );
711
712            // The section below relies on TotalOrder for correctness so we'll work with timestamps
713            // directly to make sure this doesn't compile if someone attempts to make this operator
714            // generic over partial orders in the future.
715            let Some(mut upper) = resume_upper.clone().into_option() else {
716                write_frontier.borrow_mut().clear();
717                return Ok(());
718            };
719
720            let mut deferred_updates = vec![];
721            let mut extra_updates = vec![];
722            // We must wait until we have data to commit before starting a transaction because
723            // Kafka doesn't have a heartbeating mechanism to keep a transaction open indefinitely.
724            // This flag tracks whether we have started the transaction.
725            let mut transaction_begun = false;
726            while let Some(event) = input.next().await {
727                match event {
728                    Event::Data(_cap, batch) => {
729                        for (message, time, diff) in batch {
730                            // We want to publish updates in time order and we know that we have
731                            // already committed all times not beyond `upper`. Therefore, if this
732                            // update happens *exactly* at upper then it is the minimum pending
733                            // time and so emitting it now will not violate the timestamp publish
734                            // order. This optimization is load bearing because it is the mechanism
735                            // by which we incrementally stream the initial snapshot out to Kafka
736                            // instead of buffering it all in memory first. This argument doesn't
737                            // hold for partially ordered time because many different timestamps
738                            // can be *exactly* at upper but we can't know ahead of time which one
739                            // will be advanced in the next progress message.
740                            match upper.cmp(&time) {
741                                Ordering::Less => deferred_updates.push((message, time, diff)),
742                                Ordering::Equal => {
743                                    if !transaction_begun {
744                                        producer.begin_transaction().await?;
745                                        transaction_begun = true;
746                                    }
747                                    producer.send(&message, time, diff)?;
748                                }
749                                Ordering::Greater => continue,
750                            }
751                        }
752                    }
753                    Event::Progress(progress) => {
754                        // Ignore progress updates before our resumption frontier
755                        if !PartialOrder::less_equal(&resume_upper, &progress) {
756                            continue;
757                        }
758                        // Also ignore progress updates until we are past the as_of frontier. This
759                        // is to avoid the following pathological scenario:
760                        // 1. Sink gets instantiated with an as_of = {10}, resume_upper = {0}.
761                        //    `progress` initially jumps at {10}, then the snapshot appears at time
762                        //    10.
763                        // 2. `progress` would normally advance to say {11} and we would commit the
764                        //    snapshot but clusterd crashes instead.
765                        // 3. A new cluster restarts the sink with an earlier as_of, say {5}. This
766                        //    is valid, the earlier as_of has strictly more information. The
767                        //    snapshot now appears at time 5.
768                        //
769                        // If we were to commit an empty transaction in step 1 and advanced the
770                        // resume_upper to {10} then in step 3 we would ignore the snapshot that
771                        // now appears at 5 completely. So it is important to only start committing
772                        // transactions after we're strictly beyond the as_of.
773                        // TODO(petrosagg): is this logic an indication of us holding something
774                        // wrong elsewhere? Investigate.
775                        // Note: !PartialOrder::less_than(as_of, progress) would not be equivalent
776                        // nor correct for partially ordered times.
777                        if !as_of.iter().all(|t| !progress.less_equal(t)) {
778                            continue;
779                        }
780                        if !transaction_begun {
781                            producer.begin_transaction().await?;
782                        }
783
784                        // N.B. Shrinking the Vec here is important because when starting the Sink
785                        // we might buffer a ton of updates into these collections, e.g. if someone
786                        // deleted the progress topic and the resume upper is 0, and we don't want
787                        // to keep around a massively oversized VEc.
788                        deferred_updates.shrink_to(buffer_min_capacity.get());
789                        extra_updates.extend(
790                            deferred_updates
791                                .extract_if(.., |(_, time, _)| !progress.less_equal(time)),
792                        );
793                        extra_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
794
795                        // N.B. See the comment above.
796                        extra_updates.shrink_to(buffer_min_capacity.get());
797                        for (message, time, diff) in extra_updates.drain(..) {
798                            producer.send(&message, time, diff)?;
799                        }
800
801                        debug!("{name}: committing transaction for {}", progress.pretty());
802                        producer.commit_transaction(progress.clone()).await?;
803                        transaction_begun = false;
804                        let mut expect_upper = write_handle.shared_upper();
805                        loop {
806                            if PartialOrder::less_equal(&progress, &expect_upper) {
807                                // The frontier has already been advanced as far as necessary.
808                                break;
809                            }
810                            // TODO(sinks): include the high water mark in the output topic for
811                            // the messages we've published, if and when we allow reads to the sink
812                            // directly, to allow monitoring the progress of the sink in terms of
813                            // the output system.
814                            const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
815                            match write_handle
816                                .compare_and_append(EMPTY, expect_upper, progress.clone())
817                                .await
818                                .expect("valid usage")
819                            {
820                                Ok(()) => break,
821                                Err(mismatch) => {
822                                    expect_upper = mismatch.current;
823                                }
824                            }
825                        }
826                        write_frontier.borrow_mut().clone_from(&progress);
827                        match progress.into_option() {
828                            Some(new_upper) => upper = new_upper,
829                            None => break,
830                        }
831                    }
832                }
833            }
834            Ok(())
835        })
836    });
837
838    let statuses = errors.map(|error: Rc<ContextCreationError>| {
839        let hint = match *error {
840            ContextCreationError::KafkaError(KafkaError::Transaction(ref e)) => {
841                if e.is_retriable() && e.code() == RDKafkaErrorCode::OperationTimedOut {
842                    let hint = "If you're running a single Kafka broker, ensure that the configs \
843                        transaction.state.log.replication.factor, transaction.state.log.min.isr, \
844                        and offsets.topic.replication.factor are set to 1 on the broker";
845                    Some(hint.to_owned())
846                } else {
847                    None
848                }
849            }
850            _ => None,
851        };
852
853        HealthStatusMessage {
854            id: None,
855            update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint),
856            namespace: if matches!(*error, ContextCreationError::Ssh(_)) {
857                StatusNamespace::Ssh
858            } else {
859                StatusNamespace::Kafka
860            },
861        }
862    });
863
864    (statuses, button.press_on_drop())
865}
866
867/// Determines the latest progress record from the specified topic for the given
868/// progress key.
869///
870/// IMPORTANT: to achieve exactly once guarantees, the producer that will resume
871/// production at the returned timestamp *must* have called `init_transactions`
872/// prior to calling this method.
873async fn determine_sink_progress(
874    sink_id: GlobalId,
875    connection: &KafkaSinkConnection,
876    storage_configuration: &StorageConfiguration,
877    metrics: Arc<KafkaSinkMetrics>,
878) -> Result<Option<ProgressRecord>, ContextCreationError> {
879    // ****************************** WARNING ******************************
880    // Be VERY careful when editing the code in this function. It is very easy
881    // to accidentally introduce a correctness or liveness bug when refactoring
882    // this code.
883    // ****************************** WARNING ******************************
884
885    let TimeoutConfig {
886        fetch_metadata_timeout,
887        progress_record_fetch_timeout,
888        ..
889    } = storage_configuration.parameters.kafka_timeout_config;
890
891    let client_id = connection.client_id(
892        storage_configuration.config_set(),
893        &storage_configuration.connection_context,
894        sink_id,
895    );
896    let group_id = connection.progress_group_id(&storage_configuration.connection_context, sink_id);
897    let progress_topic = connection
898        .progress_topic(&storage_configuration.connection_context)
899        .into_owned();
900    let progress_topic_options = &connection.connection.progress_topic_options;
901    let progress_key = ProgressKey::new(sink_id);
902
903    let common_options = btreemap! {
904        // Consumer group ID, which may have been overridden by the user. librdkafka requires this,
905        // even though we'd prefer to disable the consumer group protocol entirely.
906        "group.id" => group_id,
907        // Allow Kafka monitoring tools to identify this consumer.
908        "client.id" => client_id,
909        "enable.auto.commit" => "false".into(),
910        "auto.offset.reset" => "earliest".into(),
911        // The fetch loop below needs EOF notifications to reliably detect that we have reached the
912        // high watermark.
913        "enable.partition.eof" => "true".into(),
914    };
915
916    // Construct two cliens in read committed and read uncommitted isolations respectively. See
917    // comment below for an explanation on why we need it.
918    let progress_client_read_committed: BaseConsumer<_> = {
919        let mut opts = common_options.clone();
920        opts.insert("isolation.level", "read_committed".into());
921        let ctx = MzClientContext::default();
922        connection
923            .connection
924            .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
925            .await?
926    };
927
928    let progress_client_read_uncommitted: BaseConsumer<_> = {
929        let mut opts = common_options;
930        opts.insert("isolation.level", "read_uncommitted".into());
931        let ctx = MzClientContext::default();
932        connection
933            .connection
934            .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
935            .await?
936    };
937
938    let ctx = Arc::clone(progress_client_read_committed.client().context());
939
940    // Ensure the progress topic exists.
941    let ensure_topic_config =
942        match &*SINK_ENSURE_TOPIC_CONFIG.get(storage_configuration.config_set()) {
943            "skip" => EnsureTopicConfig::Skip,
944            "check" => EnsureTopicConfig::Check,
945            "alter" => EnsureTopicConfig::Alter,
946            _ => {
947                tracing::warn!(
948                    topic = progress_topic,
949                    "unexpected value for ensure-topic-config; skipping checks"
950                );
951                EnsureTopicConfig::Skip
952            }
953        };
954    mz_storage_client::sink::ensure_kafka_topic(
955        connection,
956        storage_configuration,
957        &progress_topic,
958        progress_topic_options,
959        ensure_topic_config,
960    )
961    .await
962    .add_context("error registering kafka progress topic for sink")?;
963
964    // We are about to spawn a blocking task that cannot be aborted by simply calling .abort() on
965    // its handle but we must be able to cancel it prompty so as to not leave long running
966    // operations around when interest to this task is lost. To accomplish this we create a shared
967    // token of which a weak reference is given to the task and a strong reference is held by the
968    // parent task. The task periodically checks if its weak reference is still valid before
969    // continuing its work.
970    let parent_token = Arc::new(());
971    let child_token = Arc::downgrade(&parent_token);
972    let task_name = format!("get_latest_ts:{sink_id}");
973    let sink_progress_search = SINK_PROGRESS_SEARCH.get(storage_configuration.config_set());
974    let result = task::spawn_blocking(|| task_name, move || {
975        let progress_topic = progress_topic.as_ref();
976        // Ensure the progress topic has exactly one partition. Kafka only
977        // guarantees ordering within a single partition, and we need a strict
978        // order on the progress messages we read and write.
979        let partitions = match mz_kafka_util::client::get_partitions(
980            progress_client_read_committed.client(),
981            progress_topic,
982            fetch_metadata_timeout,
983        ) {
984            Ok(partitions) => partitions,
985            Err(GetPartitionsError::TopicDoesNotExist) => {
986                // The progress topic doesn't exist, which indicates there is
987                // no committed timestamp.
988                return Ok(None);
989            }
990            e => e.with_context(|| {
991                format!(
992                    "Unable to fetch metadata about progress topic {}",
993                    progress_topic
994                )
995            })?,
996        };
997        if partitions.len() != 1 {
998            bail!(
999                    "Progress topic {} should contain a single partition, but instead contains {} partitions",
1000                    progress_topic, partitions.len(),
1001                );
1002        }
1003        let partition = partitions.into_element();
1004
1005        // We scan from the beginning and see if we can find a progress record. We have
1006        // to do it like this because Kafka Control Batches mess with offsets. We
1007        // therefore cannot simply take the last offset from the back and expect a
1008        // progress message there. With a transactional producer, the OffsetTail(1) will
1009        // not point to an progress message but a control message. With aborted
1010        // transactions, there might even be a lot of garbage at the end of the
1011        // topic or in between.
1012
1013        metrics.consumed_progress_records.set(0);
1014
1015        // First, determine the current high water mark for the progress topic.
1016        // This is the position our `progress_client` consumer *must* reach
1017        // before we can conclude that we've seen the latest progress record for
1018        // the specified `progress_key`. A safety argument:
1019        //
1020        //   * Our caller has initialized transactions before calling this
1021        //     method, which prevents the prior incarnation of this sink from
1022        //     committing any further progress records.
1023        //
1024        //   * We use `read_uncommitted` isolation to ensure that we fetch the
1025        //     true high water mark for the topic, even if there are pending
1026        //     transactions in the topic. If we used the `read_committed`
1027        //     isolation level, we'd instead get the "last stable offset" (LSO),
1028        //     which is the offset of the first message in an open transaction,
1029        //     which might not include the last progress message committed for
1030        //     this sink! (While the caller of this function has fenced out
1031        //     older producers for this sink, *other* sinks writing using the
1032        //     same progress topic might have long-running transactions that
1033        //     hold back the LSO.)
1034        //
1035        //   * If another sink spins up and fences out the producer for this
1036        //     incarnation of the sink, we may not see the latest progress
1037        //     record... but since the producer has been fenced out, it will be
1038        //     unable to act on our stale information.
1039        //
1040        let (lo, hi) = progress_client_read_uncommitted
1041            .fetch_watermarks(progress_topic, partition, fetch_metadata_timeout)
1042            .map_err(|e| {
1043                anyhow!(
1044                    "Failed to fetch metadata while reading from progress topic: {}",
1045                    e
1046                )
1047            })?;
1048
1049        // This topic might be long, but the desired offset will usually be right near the end.
1050        // Instead of always scanning through the entire topic, we scan through exponentially-growing
1051        // suffixes of it. (Because writes are ordered, the largest progress record in any suffix,
1052        // if present, is the global max.) If we find it in one of our suffixes, we've saved at least
1053        // an order of magnitude of work; if we don't, we've added at most a constant factor.
1054        let mut start_indices = vec![lo];
1055        if sink_progress_search {
1056            let mut lookback = hi.saturating_sub(lo) / 10;
1057            while lookback >= 20_000 {
1058                start_indices.push(hi - lookback);
1059                lookback /= 10;
1060            }
1061        }
1062        for lo in start_indices.into_iter().rev() {
1063            if let Some(found) = progress_search(
1064                &progress_client_read_committed,
1065                progress_record_fetch_timeout,
1066                progress_topic,
1067                partition,
1068                lo,
1069                hi,
1070                progress_key.clone(),
1071                Weak::clone(&child_token),
1072                Arc::clone(&metrics)
1073            )? {
1074                return Ok(Some(found));
1075            }
1076        }
1077        Ok(None)
1078    }).await.check_ssh_status(&ctx);
1079    // Express interest to the computation until after we've received its result
1080    drop(parent_token);
1081    result
1082}
1083
1084fn progress_search<C: ConsumerContext + 'static>(
1085    progress_client_read_committed: &BaseConsumer<C>,
1086    progress_record_fetch_timeout: Duration,
1087    progress_topic: &str,
1088    partition: i32,
1089    lo: i64,
1090    hi: i64,
1091    progress_key: ProgressKey,
1092    child_token: Weak<()>,
1093    metrics: Arc<KafkaSinkMetrics>,
1094) -> anyhow::Result<Option<ProgressRecord>> {
1095    // Seek to the beginning of the given range in the progress topic.
1096    let mut tps = TopicPartitionList::new();
1097    tps.add_partition(progress_topic, partition);
1098    tps.set_partition_offset(progress_topic, partition, Offset::Offset(lo))?;
1099    progress_client_read_committed
1100        .assign(&tps)
1101        .with_context(|| {
1102            format!(
1103                "Error seeking in progress topic {}:{}",
1104                progress_topic, partition
1105            )
1106        })?;
1107
1108    // Helper to get the progress consumer's current position.
1109    let get_position = || {
1110        if child_token.strong_count() == 0 {
1111            bail!("operation cancelled");
1112        }
1113        let position = progress_client_read_committed
1114            .position()?
1115            .find_partition(progress_topic, partition)
1116            .ok_or_else(|| {
1117                anyhow!(
1118                    "No position info found for progress topic {}",
1119                    progress_topic
1120                )
1121            })?
1122            .offset();
1123        let position = match position {
1124            Offset::Offset(position) => position,
1125            // An invalid offset indicates the consumer has not yet read a
1126            // message. Since we assigned the consumer to the beginning of
1127            // the topic, it's safe to return the low water mark here, which
1128            // indicates the position before the first possible message.
1129            //
1130            // Note that it's important to return the low water mark and not
1131            // the minimum possible offset (i.e., zero) in order to break
1132            // out of the loop if the topic is empty but the low water mark
1133            // is greater than zero.
1134            Offset::Invalid => lo,
1135            _ => bail!(
1136                "Consumer::position returned offset of wrong type: {:?}",
1137                position
1138            ),
1139        };
1140        // Record the outstanding number of progress records that remain to be processed
1141        let outstanding = u64::try_from(std::cmp::max(0, hi - position)).unwrap();
1142        metrics.outstanding_progress_records.set(outstanding);
1143        Ok(position)
1144    };
1145
1146    info!("fetching latest progress record for {progress_key}, lo/hi: {lo}/{hi}");
1147
1148    // Read messages until the consumer is positioned at or beyond the high
1149    // water mark.
1150    //
1151    // We use `read_committed` isolation to ensure we don't see progress
1152    // records for transactions that did not commit. This means we have to
1153    // wait for the LSO to progress to the high water mark `hi`, which means
1154    // waiting for any open transactions for other sinks using the same
1155    // progress topic to complete. We set a short transaction timeout (10s)
1156    // to ensure we never need to wait more than 10s.
1157    //
1158    // Note that the stall time on the progress topic is not a function of
1159    // transaction size. We've designed our transactions so that the
1160    // progress record is always written last, after all the data has been
1161    // written, and so the window of time in which the progress topic has an
1162    // open transaction is quite small. The only vulnerability is if another
1163    // sink using the same progress topic crashes in that small window
1164    // between writing the progress record and committing the transaction,
1165    // in which case we have to wait out the transaction timeout.
1166    //
1167    // Important invariant: we only exit this loop successfully (i.e., not
1168    // returning an error) if we have positive proof of a position at or
1169    // beyond the high water mark. To make this invariant easy to check, do
1170    // not use `break` in the body of the loop.
1171    let mut last_progress: Option<ProgressRecord> = None;
1172    loop {
1173        let current_position = get_position()?;
1174
1175        if current_position >= hi {
1176            // consumer is at or beyond the high water mark and has read enough messages
1177            break;
1178        }
1179
1180        let message = match progress_client_read_committed.poll(progress_record_fetch_timeout) {
1181            Some(Ok(message)) => message,
1182            Some(Err(KafkaError::PartitionEOF(_))) => {
1183                // No message, but the consumer's position may have advanced
1184                // past a transaction control message that positions us at
1185                // or beyond the high water mark. Go around the loop again
1186                // to check.
1187                continue;
1188            }
1189            Some(Err(e)) => bail!("failed to fetch progress message {e}"),
1190            None => {
1191                bail!(
1192                    "timed out while waiting to reach high water mark of non-empty \
1193                        topic {progress_topic}:{partition}, lo/hi: {lo}/{hi}, current position: {current_position}"
1194                );
1195            }
1196        };
1197
1198        if message.key() != Some(progress_key.to_bytes()) {
1199            // This is a progress message for a different sink.
1200            continue;
1201        }
1202
1203        metrics.consumed_progress_records.inc();
1204
1205        let Some(payload) = message.payload() else {
1206            continue;
1207        };
1208        let progress = parse_progress_record(payload)?;
1209
1210        match last_progress {
1211            Some(last_progress)
1212                if !PartialOrder::less_equal(&last_progress.frontier, &progress.frontier) =>
1213            {
1214                bail!(
1215                    "upper regressed in topic {progress_topic}:{partition} from {:?} to {:?}",
1216                    &last_progress.frontier,
1217                    &progress.frontier,
1218                );
1219            }
1220            _ => last_progress = Some(progress),
1221        }
1222    }
1223
1224    // If we get here, we are assured that we've read all messages up to
1225    // the high water mark, and therefore `last_timestamp` contains the
1226    // most recent timestamp for the sink under consideration.
1227    Ok(last_progress)
1228}
1229
1230/// This is the legacy struct that used to be emitted as part of a transactional produce and
1231/// contains the largest timestamp within the batch committed. Since it is just a timestamp it
1232/// cannot encode the fact that a sink has finished and deviates from upper frontier semantics.
1233/// Materialize no longer produces this record but it's possible that we encounter this in topics
1234/// written by older versions. In those cases we convert it into upper semantics by stepping the
1235/// timestamp forward.
1236#[derive(Debug, PartialEq, Serialize, Deserialize)]
1237pub struct LegacyProgressRecord {
1238    // Double Option to tell apart an omitted field from one set to null explicitly
1239    // https://github.com/serde-rs/serde/issues/984
1240    #[serde(default, deserialize_with = "deserialize_some")]
1241    pub timestamp: Option<Option<Timestamp>>,
1242}
1243
1244// Any value that is present is considered Some value, including null.
1245fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
1246where
1247    T: Deserialize<'de>,
1248    D: Deserializer<'de>,
1249{
1250    Deserialize::deserialize(deserializer).map(Some)
1251}
1252
1253/// This struct is emitted as part of a transactional produce, and contains the upper frontier of
1254/// the batch committed. It is used to recover the frontier a sink needs to resume at.
1255#[derive(Debug, PartialEq, Serialize, Deserialize)]
1256pub struct ProgressRecord {
1257    #[serde(
1258        deserialize_with = "deserialize_frontier",
1259        serialize_with = "serialize_frontier"
1260    )]
1261    pub frontier: Antichain<Timestamp>,
1262    #[serde(default)]
1263    pub version: u64,
1264}
1265fn serialize_frontier<S>(frontier: &Antichain<Timestamp>, serializer: S) -> Result<S::Ok, S::Error>
1266where
1267    S: Serializer,
1268{
1269    Serialize::serialize(frontier.elements(), serializer)
1270}
1271
1272fn deserialize_frontier<'de, D>(deserializer: D) -> Result<Antichain<Timestamp>, D::Error>
1273where
1274    D: Deserializer<'de>,
1275{
1276    let times: Vec<Timestamp> = Deserialize::deserialize(deserializer)?;
1277    Ok(Antichain::from(times))
1278}
1279
1280fn parse_progress_record(payload: &[u8]) -> Result<ProgressRecord, anyhow::Error> {
1281    Ok(match serde_json::from_slice::<ProgressRecord>(payload) {
1282        Ok(progress) => progress,
1283        // If we fail to deserialize we might be reading a legacy progress record
1284        Err(_) => match serde_json::from_slice::<LegacyProgressRecord>(payload) {
1285            Ok(LegacyProgressRecord {
1286                timestamp: Some(Some(time)),
1287            }) => ProgressRecord {
1288                frontier: Antichain::from_elem(time.step_forward()),
1289                version: 0,
1290            },
1291            Ok(LegacyProgressRecord {
1292                timestamp: Some(None),
1293            }) => ProgressRecord {
1294                frontier: Antichain::new(),
1295                version: 0,
1296            },
1297            _ => match std::str::from_utf8(payload) {
1298                Ok(payload) => bail!("invalid progress record: {payload}"),
1299                Err(_) => bail!("invalid progress record bytes: {payload:?}"),
1300            },
1301        },
1302    })
1303}
1304
1305/// Fetches the partition count for the identified topic.
1306async fn fetch_partition_count(
1307    producer: &ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1308    sink_id: GlobalId,
1309    topic_name: &str,
1310) -> Result<u64, anyhow::Error> {
1311    let meta = task::spawn_blocking(|| format!("kafka_sink_fetch_partition_count:{sink_id}"), {
1312        let producer = producer.clone();
1313        move || {
1314            producer
1315                .client()
1316                .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
1317        }
1318    })
1319    .await
1320    .check_ssh_status(producer.context())?;
1321
1322    match meta.topics().iter().find(|t| t.name() == topic_name) {
1323        Some(topic) => {
1324            let partition_count = u64::cast_from(topic.partitions().len());
1325            if partition_count == 0 {
1326                bail!("topic {topic_name} has an impossible partition count of zero");
1327            }
1328            Ok(partition_count)
1329        }
1330        None => bail!("topic {topic_name} does not exist"),
1331    }
1332}
1333
1334/// Fetches the partition count for the identified topic at the specified
1335/// interval.
1336///
1337/// When an updated partition count is discovered, invokes
1338/// `update_partition_count` with the new partition count.
1339async fn fetch_partition_count_loop<F>(
1340    producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1341    sink_id: GlobalId,
1342    topic_name: String,
1343    interval: Duration,
1344    update_partition_count: Arc<F>,
1345) where
1346    F: Fn(u64),
1347{
1348    let mut interval = time::interval(interval);
1349    interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
1350    loop {
1351        interval.tick().await;
1352        match fetch_partition_count(&producer, sink_id, &topic_name).await {
1353            Ok(pc) => update_partition_count(pc),
1354            Err(e) => {
1355                warn!(%sink_id, "failed updating partition count: {e}");
1356                continue;
1357            }
1358        };
1359    }
1360}
1361
1362/// Encodes a stream of `(Option<Row>, Option<Row>)` updates using the specified encoder.
1363///
1364/// Input [`Row`] updates must me compatible with the given implementor of [`Encode`].
1365fn encode_collection<'scope, T: timely::progress::Timestamp>(
1366    name: String,
1367    input: VecCollection<'scope, T, (Option<Row>, DiffPair<Row>), Diff>,
1368    envelope: SinkEnvelope,
1369    connection: KafkaSinkConnection,
1370    storage_configuration: StorageConfiguration,
1371) -> (
1372    VecCollection<'scope, T, KafkaMessage, Diff>,
1373    StreamVec<'scope, T, HealthStatusMessage>,
1374    PressOnDropButton,
1375) {
1376    let mut builder = AsyncOperatorBuilder::new(name, input.inner.scope());
1377
1378    let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1379    let mut input = builder.new_input_for(input.inner, Pipeline, &output);
1380
1381    let (button, errors) = builder.build_fallible(move |caps| {
1382        Box::pin(async move {
1383            let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1384            let key_desc = connection
1385                .key_desc_and_indices
1386                .as_ref()
1387                .map(|(desc, _indices)| desc.clone());
1388            let value_desc = connection.value_desc;
1389
1390            let key_encoder: Option<Box<dyn Encode>> =
1391                match (key_desc, connection.format.key_format) {
1392                    (Some(desc), Some(KafkaSinkFormatType::Bytes)) => {
1393                        Some(Box::new(BinaryEncoder::new(desc, false)))
1394                    }
1395                    (Some(desc), Some(KafkaSinkFormatType::Text)) => {
1396                        Some(Box::new(TextEncoder::new(desc, false)))
1397                    }
1398                    (Some(desc), Some(KafkaSinkFormatType::Json)) => {
1399                        Some(Box::new(JsonEncoder::new(desc, false)))
1400                    }
1401                    (Some(desc), Some(KafkaSinkFormatType::Avro {
1402                        schema,
1403                        compatibility_level,
1404                        csr_connection,
1405                    })) => {
1406                        // Ensure that schemas are registered with the schema registry.
1407                        //
1408                        // Note that where this lies in the rendering cycle means that we will publish the
1409                        // schemas each time the sink is rendered.
1410                        let ccsr = csr_connection
1411                            .connect(&storage_configuration, InTask::Yes)
1412                            .await?;
1413
1414                        let schema_id = mz_storage_client::sink::publish_kafka_schema(
1415                            ccsr,
1416                            format!("{}-key", connection.topic),
1417                            schema.clone(),
1418                            mz_ccsr::SchemaType::Avro,
1419                            compatibility_level,
1420                        )
1421                        .await
1422                        .context("error publishing kafka schemas for sink")?;
1423
1424                        Some(Box::new(AvroEncoder::new(desc, false, &schema, schema_id)))
1425                    }
1426                    (None, None) => None,
1427                    (desc, format) => {
1428                        return Err(anyhow!(
1429                            "key_desc and key_format must be both set or both unset, but key_desc: {:?}, key_format: {:?}",
1430                            desc,
1431                            format
1432                        ))
1433                    }
1434                };
1435
1436            // whether to apply the debezium envelope to the value encoding
1437            let debezium = matches!(envelope, SinkEnvelope::Debezium);
1438
1439            let value_encoder: Box<dyn Encode> = match connection.format.value_format {
1440                KafkaSinkFormatType::Bytes => Box::new(BinaryEncoder::new(value_desc, debezium)),
1441                KafkaSinkFormatType::Text => Box::new(TextEncoder::new(value_desc, debezium)),
1442                KafkaSinkFormatType::Json => Box::new(JsonEncoder::new(value_desc, debezium)),
1443                KafkaSinkFormatType::Avro {
1444                    schema,
1445                    compatibility_level,
1446                    csr_connection,
1447                } => {
1448                    // Ensure that schemas are registered with the schema registry.
1449                    //
1450                    // Note that where this lies in the rendering cycle means that we will publish the
1451                    // schemas each time the sink is rendered.
1452                    let ccsr = csr_connection
1453                        .connect(&storage_configuration, InTask::Yes)
1454                        .await?;
1455
1456                    let schema_id = mz_storage_client::sink::publish_kafka_schema(
1457                        ccsr,
1458                        format!("{}-value", connection.topic),
1459                        schema.clone(),
1460                        mz_ccsr::SchemaType::Avro,
1461                        compatibility_level,
1462                    )
1463                    .await
1464                    .context("error publishing kafka schemas for sink")?;
1465
1466                    Box::new(AvroEncoder::new(value_desc, debezium, &schema, schema_id))
1467                }
1468            };
1469
1470            // !IMPORTANT!
1471            // Correctness of this operator relies on no fallible operations happening after this
1472            // point. This is a temporary workaround of build_fallible's bad interaction of owned
1473            // capabilities and errors.
1474            // TODO(petrosagg): Make the fallible async operator safe
1475            *capset = CapabilitySet::new();
1476
1477            let mut row_buf = Row::default();
1478            let mut datums = DatumVec::new();
1479
1480            while let Some(event) = input.next().await {
1481                if let Event::Data(cap, rows) = event {
1482                    for ((key, value), time, diff) in rows {
1483                        let mut hash = None;
1484                        let mut headers = vec![];
1485                        if connection.headers_index.is_some() || connection.partition_by.is_some() {
1486                            // Header values and partition by values are derived from the row that
1487                            // produces an event. But it is ambiguous whether to use the `before` or
1488                            // `after` from the event. The rule applied here is simple: use `after`
1489                            // if it exists (insertions and updates), otherwise fall back to `before`
1490                            // (deletions).
1491                            //
1492                            // It is up to the SQL planner to ensure this produces sensible results.
1493                            // (When using the upsert envelope and both `before` and `after` are
1494                            // present, it's always unambiguous to use `after` because that's all
1495                            // that will be present in the Kafka message; when using the Debezium
1496                            // envelope, it's okay to refer to columns in the key because those
1497                            // are guaranteed to be the same in both `before` and `after`.)
1498                            let row = value
1499                                .after
1500                                .as_ref()
1501                                .or(value.before.as_ref())
1502                                .expect("one of before or after must be set");
1503                            let row = datums.borrow_with(row);
1504
1505                            if let Some(i) = connection.headers_index {
1506                                headers = encode_headers(row[i]);
1507                            }
1508
1509                            if let Some(partition_by) = &connection.partition_by {
1510                                hash = Some(evaluate_partition_by(partition_by, &row));
1511                            }
1512                        }
1513                        let (key, hash) = match key {
1514                            Some(key) => {
1515                                let key_encoder = key_encoder.as_ref().expect("key present");
1516                                let key = key_encoder.encode_unchecked(key);
1517                                let hash = hash.unwrap_or_else(|| key_encoder.hash(&key));
1518                                (Some(key), hash)
1519                            }
1520                            None => (None, hash.unwrap_or(0))
1521                        };
1522                        let value = match envelope {
1523                            SinkEnvelope::Upsert => value.after,
1524                            SinkEnvelope::Debezium => {
1525                                dbz_format(&mut row_buf.packer(), value);
1526                                Some(row_buf.clone())
1527                            }
1528                            SinkEnvelope::Append => {
1529                                unreachable!("Append envelope is not valid for Kafka sinks")
1530                            }
1531                        };
1532                        let value = value.map(|value| value_encoder.encode_unchecked(value));
1533                        let message = KafkaMessage {
1534                            hash,
1535                            key,
1536                            value,
1537                            headers,
1538                        };
1539                        output.give(&cap, (message, time, diff));
1540                    }
1541                }
1542            }
1543            Ok::<(), anyhow::Error>(())
1544        })
1545    });
1546
1547    let statuses = errors.map(|error| HealthStatusMessage {
1548        id: None,
1549        update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1550        namespace: StatusNamespace::Kafka,
1551    });
1552
1553    (stream.as_collection(), statuses, button.press_on_drop())
1554}
1555
1556fn encode_headers(datum: Datum) -> Vec<KafkaHeader> {
1557    let mut out = vec![];
1558    if datum.is_null() {
1559        return out;
1560    }
1561    for (key, value) in datum.unwrap_map().iter() {
1562        out.push(KafkaHeader {
1563            key: key.into(),
1564            value: match value {
1565                Datum::Null => None,
1566                Datum::String(s) => Some(s.as_bytes().to_vec()),
1567                Datum::Bytes(b) => Some(b.to_vec()),
1568                _ => panic!("encode_headers called with unexpected header value {value:?}"),
1569            },
1570        })
1571    }
1572    out
1573}
1574
1575/// Evaluates a partition by expression on the given row, returning the hash
1576/// value to use for partition assignment.
1577///
1578/// The provided expression must have type `Int32`, `Int64`, `UInt32`, or
1579/// `UInt64`. If the expression produces an error when evaluated, or if the
1580/// expression is of a signed integer type and produces a negative value, this
1581/// function returns 0.
1582fn evaluate_partition_by(partition_by: &MirScalarExpr, row: &[Datum]) -> u64 {
1583    // NOTE(benesch): The way this function converts errors and invalid values
1584    // to 0 is somewhat surpising. Ideally, we would put the sink in a
1585    // permanently errored state if the partition by expression produces an
1586    // error or invalid value. But we don't presently have a way for sinks to
1587    // report errors (see materialize#17688), so the current behavior was determined to be
1588    // the best available option. The behavior is clearly documented in the
1589    // user-facing `CREATE SINK` docs.
1590    let temp_storage = RowArena::new();
1591    match partition_by.eval(row, &temp_storage) {
1592        Ok(Datum::UInt64(u)) => u,
1593        Ok(datum) => {
1594            // If we are here the only valid type that we should be seeing is
1595            // null. Anything else is a bug in the planner.
1596            soft_assert_or_log!(datum.is_null(), "unexpected partition_by result: {datum:?}");
1597            // We treat nulls the same as we treat errors: map them to partition 0.
1598            0
1599        }
1600        Err(_) => 0,
1601    }
1602}
1603
1604#[cfg(test)]
1605mod test {
1606    use mz_ore::assert_err;
1607
1608    use super::*;
1609
1610    #[mz_ore::test]
1611    fn progress_record_migration() {
1612        assert_err!(parse_progress_record(b"{}"));
1613
1614        assert_eq!(
1615            parse_progress_record(b"{\"timestamp\":1}").unwrap(),
1616            ProgressRecord {
1617                frontier: Antichain::from_elem(2.into()),
1618                version: 0,
1619            }
1620        );
1621
1622        assert_eq!(
1623            parse_progress_record(b"{\"timestamp\":null}").unwrap(),
1624            ProgressRecord {
1625                frontier: Antichain::new(),
1626                version: 0,
1627            }
1628        );
1629
1630        assert_eq!(
1631            parse_progress_record(b"{\"frontier\":[1]}").unwrap(),
1632            ProgressRecord {
1633                frontier: Antichain::from_elem(1.into()),
1634                version: 0,
1635            }
1636        );
1637
1638        assert_eq!(
1639            parse_progress_record(b"{\"frontier\":[]}").unwrap(),
1640            ProgressRecord {
1641                frontier: Antichain::new(),
1642                version: 0,
1643            }
1644        );
1645
1646        assert_eq!(
1647            parse_progress_record(b"{\"frontier\":[], \"version\": 42}").unwrap(),
1648            ProgressRecord {
1649                frontier: Antichain::new(),
1650                version: 42,
1651            }
1652        );
1653
1654        assert_err!(parse_progress_record(b"{\"frontier\":null}"));
1655    }
1656}