mz_storage/sink/kafka.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the sink dataflow of a [`KafkaSinkConnection`]. The dataflow consists
11//! of two operators in order to take advantage of all the available workers.
12//!
13//! ```text
14//! ┏━━━━━━━━━━━━━━┓
15//! ┃ persist ┃
16//! ┃ source ┃
17//! ┗━━━━━━┯━━━━━━━┛
18//! │ stream of arrangement batches (trace reader dropped)
19//! │
20//! ┏━━━━━━v━━━━━━┓
21//! ┃ row ┃ walks each batch's cursor and emits one
22//! ┃ encoder ┃ encoded `KafkaMessage` per DiffPair
23//! ┗━━━━━━┯━━━━━━┛
24//! │ encoded data
25//! │
26//! ┏━━━━━━v━━━━━━┓
27//! ┃ kafka ┃ (single worker)
28//! ┃ sink ┃
29//! ┗━━┯━━━━━━━━┯━┛
30//! records │ │ uppers
31//! ╭────v──╮ ╭───v──────╮
32//! │ data │ │ progress │ <- records and uppers are produced
33//! │ topic │ │ topic │ transactionally to both topics
34//! ╰───────╯ ╰──────────╯
35//! ```
36//!
37//! # Encoding
38//!
39//! One part of the dataflow deals with encoding the rows that we read from persist. The encoder
40//! walks the input arrangement's batches via
41//! [`mz_interchange::envelopes::for_each_diff_pair`], producing one encoded `KafkaMessage` per
42//! `DiffPair` observed at each `(key, timestamp)`. An initialization step first ensures that the
43//! schemas are published to the Schema Registry.
44//!
45//! # Sinking
46//!
47//! The other part of the dataflow, and what this module mostly deals with, is interacting with the
48//! Kafka cluster in order to transactionally commit batches (sets of records associated with a
49//! frontier). All the processing happens in a single worker and so all previously encoded records
50//! go through an exchange in order to arrive at the chosen worker. We may be able to improve this
51//! in the future by committing disjoint partitions of the key space for independent workers but
52//! for now we do the simple thing.
53//!
54//! ## Retries
55//!
56//! All of the retry logic heavy lifting is offloaded to `librdkafka` since it already implements
57//! the required behavior[1]. In particular we only ever enqueue records to its send queue and
58//! eventually call `commit_transaction` which will ensure that all queued messages are
59//! successfully delivered before the transaction is reported as committed.
60//!
61//! The only error that is possible during sending is that the queue is full. We are purposefully
62//! NOT handling this error and simply configure `librdkafka` with a very large queue. The reason
63//! for this choice is that the only choice for hanlding such an error ourselves would be to queue
64//! it, and there isn't a good argument about two small queues being better than one big one. If we
65//! reach the queue limit we simply error out the entire sink dataflow and start over.
66//!
67//! # Error handling
68//!
69//! Both the encoding operator and the sinking operator can produce a transient error that is wired
70//! up with our health monitoring and will trigger a restart of the sink dataflow.
71//!
72//! [1]: https://github.com/confluentinc/librdkafka/blob/master/INTRODUCTION.md#message-reliability
73
74use std::cell::RefCell;
75use std::cmp::Ordering;
76use std::collections::BTreeMap;
77use std::future::Future;
78use std::rc::Rc;
79use std::sync::atomic::AtomicU64;
80use std::sync::{Arc, Weak};
81use std::time::Duration;
82
83use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
84use crate::metrics::sink::kafka::KafkaSinkMetrics;
85use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
86use crate::statistics::SinkStatistics;
87use crate::storage_state::StorageState;
88use anyhow::{Context, anyhow, bail};
89use differential_dataflow::{AsCollection, Hashable, VecCollection};
90use futures::StreamExt;
91use maplit::btreemap;
92use mz_expr::{Eval, MirScalarExpr};
93use mz_interchange::avro::AvroEncoder;
94use mz_interchange::encode::Encode;
95use mz_interchange::envelopes::{dbz_format, for_each_diff_pair};
96use mz_interchange::json::JsonEncoder;
97use mz_interchange::text_binary::{BinaryEncoder, TextEncoder};
98use mz_kafka_util::admin::EnsureTopicConfig;
99use mz_kafka_util::client::{
100 DEFAULT_FETCH_METADATA_TIMEOUT, GetPartitionsError, MzClientContext, TimeoutConfig,
101 TunnelingClientContext,
102};
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::CollectionExt;
105use mz_ore::error::ErrorExt;
106use mz_ore::future::InTask;
107use mz_ore::soft_assert_or_log;
108use mz_ore::task::{self, AbortOnDropHandle};
109use mz_persist_client::Diagnostics;
110use mz_persist_client::write::WriteHandle;
111use mz_persist_types::codec_impls::UnitSchema;
112use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
113use mz_storage_client::sink::progress_key::ProgressKey;
114use mz_storage_types::StorageDiff;
115use mz_storage_types::configuration::StorageConfiguration;
116use mz_storage_types::controller::CollectionMetadata;
117use mz_storage_types::dyncfgs::{
118 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS, KAFKA_SINK_BATCH_NUM_MESSAGES,
119 KAFKA_SINK_BATCH_SIZE, KAFKA_SINK_MESSAGE_MAX_BYTES, SINK_ENSURE_TOPIC_CONFIG,
120 SINK_PROGRESS_SEARCH,
121};
122use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
123use mz_storage_types::sinks::{
124 KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, StorageSinkDesc,
125};
126use mz_storage_types::sources::SourceData;
127use mz_storage_types::wire_format::WireFormat;
128use mz_timely_util::antichain::AntichainExt;
129use mz_timely_util::builder_async::{
130 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
131};
132use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
133use rdkafka::error::KafkaError;
134use rdkafka::message::{Header, OwnedHeaders, ToBytes};
135use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
136use rdkafka::types::RDKafkaErrorCode;
137use rdkafka::{Message, Offset, Statistics, TopicPartitionList};
138use serde::{Deserialize, Deserializer, Serialize, Serializer};
139use timely::PartialOrder;
140use timely::container::CapacityContainerBuilder;
141use timely::dataflow::StreamVec;
142use timely::dataflow::channels::pact::{Exchange, Pipeline};
143use timely::dataflow::operators::vec::{Map, ToStream};
144use timely::dataflow::operators::{CapabilitySet, Concatenate};
145use timely::progress::{Antichain, Timestamp as _};
146use tokio::sync::watch;
147use tokio::time::{self, MissedTickBehavior};
148use tracing::{debug, error, info, warn};
149
150impl<'scope> SinkRender<'scope> for KafkaSinkConnection {
151 fn get_key_indices(&self) -> Option<&[usize]> {
152 self.key_desc_and_indices
153 .as_ref()
154 .map(|(_desc, indices)| indices.as_slice())
155 }
156
157 fn get_relation_key_indices(&self) -> Option<&[usize]> {
158 self.relation_key_indices.as_deref()
159 }
160
161 fn render_sink(
162 &self,
163 storage_state: &mut StorageState,
164 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
165 sink_id: GlobalId,
166 batches: SinkBatchStream<'scope>,
167 key_is_synthetic: bool,
168 // TODO(benesch): errors should stream out through the sink,
169 // if we figure out a protocol for that.
170 _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
171 ) -> (
172 StreamVec<'scope, Timestamp, HealthStatusMessage>,
173 Vec<PressOnDropButton>,
174 ) {
175 let scope = batches.scope();
176
177 let write_handle = {
178 let persist = Arc::clone(&storage_state.persist_clients);
179 let shard_meta = sink.to_storage_metadata.clone();
180 async move {
181 let client = persist.open(shard_meta.persist_location).await?;
182 let handle = client
183 .open_writer(
184 shard_meta.data_shard,
185 Arc::new(shard_meta.relation_desc),
186 Arc::new(UnitSchema),
187 Diagnostics::from_purpose("sink handle"),
188 )
189 .await?;
190 Ok(handle)
191 }
192 };
193
194 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
195 storage_state
196 .sink_write_frontiers
197 .insert(sink_id, Rc::clone(&write_frontier));
198
199 let (encoded, encode_status, encode_token) = encode_collection(
200 format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
201 batches,
202 sink.envelope,
203 self.clone(),
204 storage_state.storage_configuration.clone(),
205 sink_id,
206 sink.from,
207 key_is_synthetic,
208 );
209
210 let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id);
211 let statistics = storage_state
212 .aggregated_statistics
213 .get_sink(&sink_id)
214 .expect("statistics initialized")
215 .clone();
216
217 let (sink_status, sink_token) = sink_collection(
218 format!("kafka-{sink_id}-sink"),
219 encoded,
220 sink_id,
221 self.clone(),
222 storage_state.storage_configuration.clone(),
223 sink,
224 metrics,
225 statistics,
226 write_handle,
227 write_frontier,
228 );
229
230 let running_status = Some(HealthStatusMessage {
231 id: None,
232 update: HealthStatusUpdate::Running,
233 namespace: StatusNamespace::Kafka,
234 })
235 .to_stream(scope);
236
237 let status = scope.concatenate([running_status, encode_status, sink_status]);
238
239 (status, vec![encode_token, sink_token])
240 }
241}
242
243struct TransactionalProducer {
244 /// The task name used for any blocking calls spawned onto the tokio threadpool.
245 task_name: String,
246 /// The topic where all the updates go.
247 data_topic: String,
248 /// The topic where all the upper frontiers go.
249 progress_topic: String,
250 /// The key each progress record is associated with.
251 progress_key: ProgressKey,
252 /// The version of this sink, used to fence out previous versions from writing.
253 sink_version: u64,
254 /// The number of partitions in the target topic.
255 partition_count: Arc<AtomicU64>,
256 /// A task to periodically refresh the partition count.
257 _partition_count_task: AbortOnDropHandle<()>,
258 /// The underlying Kafka producer.
259 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
260 /// A handle to the metrics associated with this sink.
261 statistics: SinkStatistics,
262 /// The number of messages staged for the currently open transactions. It is reset to zero
263 /// every time a transaction commits.
264 staged_messages: u64,
265 /// The total number bytes staged for the currently open transactions. It is reset to zero
266 /// every time a transaction commits.
267 staged_bytes: u64,
268 /// The timeout to use for network operations.
269 socket_timeout: Duration,
270 /// The timeout to use for committing transactions.
271 transaction_timeout: Duration,
272}
273
274impl TransactionalProducer {
275 /// Initializes a transcational producer for the sink identified by `sink_id`. After this call
276 /// returns it is guranteed that all previous `TransactionalProducer` instances for the same
277 /// sink have been fenced out (i.e `init_transations()` has been called successfully).
278 async fn new(
279 sink_id: GlobalId,
280 connection: &KafkaSinkConnection,
281 storage_configuration: &StorageConfiguration,
282 metrics: Arc<KafkaSinkMetrics>,
283 statistics: SinkStatistics,
284 sink_version: u64,
285 ) -> Result<(Self, Antichain<mz_repr::Timestamp>), ContextCreationError> {
286 let client_id = connection.client_id(
287 storage_configuration.config_set(),
288 &storage_configuration.connection_context,
289 sink_id,
290 );
291 let transactional_id =
292 connection.transactional_id(&storage_configuration.connection_context, sink_id);
293
294 let timeout_config = &storage_configuration.parameters.kafka_timeout_config;
295 let mut options = BTreeMap::new();
296 // Ensure that messages are sinked in order and without duplicates. Note that this only
297 // applies to a single instance of a producer - in the case of restarts, all bets are off
298 // and full exactly once support is required.
299 options.insert("enable.idempotence", "true".into());
300 // Use the compression type requested by the user.
301 options.insert(
302 "compression.type",
303 connection.compression_type.to_librdkafka_option().into(),
304 );
305 // Set the maximum buffer size limit. We don't want to impose anything lower than the max
306 // here as the operator has nothing better to do with the data than to buffer them.
307 options.insert("queue.buffering.max.kbytes", "2147483647".into());
308 // Disable the default buffer limit of 100k messages. We don't want to impose any limit
309 // here as the operator has nothing better to do with the data than to buffer them.
310 options.insert("queue.buffering.max.messages", "0".into());
311 // Make the Kafka producer wait at least 10 ms before sending out MessageSets
312 options.insert("queue.buffering.max.ms", format!("{}", 10));
313 // Time out transactions after 60 seconds
314 options.insert(
315 "transaction.timeout.ms",
316 format!("{}", timeout_config.transaction_timeout.as_millis()),
317 );
318 // Use the transactional ID requested by the user.
319 options.insert("transactional.id", transactional_id);
320 // Allow Kafka monitoring tools to identify this producer.
321 options.insert("client.id", client_id);
322 // We want to be notified regularly with statistics
323 options.insert("statistics.interval.ms", "1000".into());
324 // Maximum size of a single produced message, controlled by dyncfg so
325 // operators can raise the limit when messages exceed librdkafka's 1MB
326 // default.
327 options.insert(
328 "message.max.bytes",
329 format!(
330 "{}",
331 KAFKA_SINK_MESSAGE_MAX_BYTES.get(storage_configuration.config_set())
332 ),
333 );
334 // Maximum size (bytes) of a produced MessageSet.
335 options.insert(
336 "batch.size",
337 format!(
338 "{}",
339 KAFKA_SINK_BATCH_SIZE.get(storage_configuration.config_set())
340 ),
341 );
342 // Maximum number of messages batched in one MessageSet.
343 options.insert(
344 "batch.num.messages",
345 format!(
346 "{}",
347 KAFKA_SINK_BATCH_NUM_MESSAGES.get(storage_configuration.config_set())
348 ),
349 );
350
351 let ctx = MzClientContext::default();
352
353 let stats_receiver = ctx.subscribe_statistics();
354 let task_name = format!("kafka_sink_metrics_collector:{sink_id}");
355 task::spawn(
356 || &task_name,
357 collect_statistics(stats_receiver, Arc::clone(&metrics)),
358 );
359
360 let producer: ThreadedProducer<_> = connection
361 .connection
362 .create_with_context(storage_configuration, ctx, &options, InTask::Yes)
363 .await?;
364
365 // The partition count is fixed up after we ensure the topic exists.
366 let partition_count = Arc::new(AtomicU64::new(0));
367 let update_partition_count = {
368 let partition_count = Arc::clone(&partition_count);
369 let metrics = Arc::clone(&metrics);
370 Arc::new(move |pc| {
371 partition_count.store(pc, std::sync::atomic::Ordering::SeqCst);
372 metrics.partition_count.set(pc);
373 })
374 };
375
376 // Start a task that will keep the partition count up to date in the
377 // background.
378 let partition_count_task = task::spawn(
379 || format!("kafka_sink_producer_fetch_metadata_loop:{sink_id}"),
380 fetch_partition_count_loop(
381 producer.clone(),
382 sink_id,
383 connection.topic.clone(),
384 connection.topic_metadata_refresh_interval,
385 Arc::clone(&update_partition_count),
386 ),
387 );
388
389 let task_name = format!("kafka_sink_producer:{sink_id}");
390 let progress_key = ProgressKey::new(sink_id);
391
392 let producer = Self {
393 task_name,
394 data_topic: connection.topic.clone(),
395 partition_count,
396 _partition_count_task: partition_count_task.abort_on_drop(),
397 progress_topic: connection
398 .progress_topic(&storage_configuration.connection_context)
399 .into_owned(),
400 progress_key,
401 sink_version,
402 producer,
403 statistics,
404 staged_messages: 0,
405 staged_bytes: 0,
406 socket_timeout: timeout_config.socket_timeout,
407 transaction_timeout: timeout_config.transaction_timeout,
408 };
409
410 let timeout = timeout_config.socket_timeout;
411 producer
412 .spawn_blocking(move |p| p.init_transactions(timeout))
413 .await?;
414
415 // We have just called init_transactions, which means that we have fenced out all previous
416 // transactional producers, making it safe to determine the resume upper.
417 let progress = determine_sink_progress(
418 sink_id,
419 connection,
420 storage_configuration,
421 Arc::clone(&metrics),
422 )
423 .await?;
424
425 let resume_upper = match progress {
426 Some(progress) => {
427 if sink_version < progress.version {
428 return Err(ContextCreationError::Other(anyhow!(
429 "Fenced off by newer version of the sink. ours={} theirs={}",
430 sink_version,
431 progress.version
432 )));
433 }
434 progress.frontier
435 }
436 None => {
437 mz_storage_client::sink::ensure_kafka_topic(
438 connection,
439 storage_configuration,
440 &connection.topic,
441 &connection.topic_options,
442 EnsureTopicConfig::Skip,
443 )
444 .await?;
445 Antichain::from_elem(Timestamp::minimum())
446 }
447 };
448
449 // At this point the topic must exist and so we can query for its
450 // partition count. Even though we have a background task to fetch the
451 // partition count, we do this synchronously to ensure we don't attempt
452 // to produce any messages with our initial partition count of 0.
453 let partition_count =
454 fetch_partition_count(&producer.producer, sink_id, &connection.topic).await?;
455 update_partition_count(partition_count);
456
457 Ok((producer, resume_upper))
458 }
459
460 /// Runs the blocking operation `f` on the producer in the tokio threadpool and checks for SSH
461 /// status in case of failure.
462 async fn spawn_blocking<F, R>(&self, f: F) -> Result<R, ContextCreationError>
463 where
464 F: FnOnce(
465 ThreadedProducer<TunnelingClientContext<MzClientContext>>,
466 ) -> Result<R, KafkaError>
467 + Send
468 + 'static,
469 R: Send + 'static,
470 {
471 let producer = self.producer.clone();
472 task::spawn_blocking(|| &self.task_name, move || f(producer))
473 .await
474 .check_ssh_status(self.producer.context())
475 }
476
477 async fn begin_transaction(&self) -> Result<(), ContextCreationError> {
478 self.spawn_blocking(|p| p.begin_transaction()).await
479 }
480
481 /// Synchronously puts the provided message to librdkafka's send queue. This method only
482 /// returns an error if the queue is full. Handling this error by buffering the message and
483 /// retrying is equivalent to adjusting the maximum number of queued items in rdkafka so it is
484 /// adviced that callers only handle this error in order to apply backpressure to the rest of
485 /// the system.
486 fn send(
487 &mut self,
488 message: &KafkaMessage,
489 time: Timestamp,
490 diff: Diff,
491 ) -> Result<(), KafkaError> {
492 assert_eq!(diff, Diff::ONE, "invalid sink update");
493
494 let mut headers = OwnedHeaders::new().insert(Header {
495 key: "materialize-timestamp",
496 value: Some(time.to_string().as_bytes()),
497 });
498 for header in &message.headers {
499 // Headers that start with `materialize-` are reserved for our
500 // internal use, so we silently drop any such user-specified
501 // headers. While this behavior is documented, it'd be a nicer UX to
502 // send a warning or error somewhere. Unfortunately sinks don't have
503 // anywhere user-visible to send errors. See database-issues#5148.
504 if header.key.starts_with("materialize-") {
505 continue;
506 }
507
508 headers = headers.insert(Header {
509 key: header.key.as_str(),
510 value: header.value.as_ref(),
511 });
512 }
513
514 let pc = self
515 .partition_count
516 .load(std::sync::atomic::Ordering::SeqCst);
517 let partition = Some(i32::try_from(message.hash % pc).unwrap());
518
519 let record = BaseRecord {
520 topic: &self.data_topic,
521 key: message.key.as_ref(),
522 payload: message.value.as_ref(),
523 headers: Some(headers),
524 partition,
525 timestamp: None,
526 delivery_opaque: (),
527 };
528 let key_size = message.key.as_ref().map(|k| k.len()).unwrap_or(0);
529 let value_size = message.value.as_ref().map(|k| k.len()).unwrap_or(0);
530 let headers_size = message
531 .headers
532 .iter()
533 .map(|h| h.key.len() + h.value.as_ref().map(|v| v.len()).unwrap_or(0))
534 .sum::<usize>();
535 let record_size = u64::cast_from(key_size + value_size + headers_size);
536 self.statistics.inc_messages_staged_by(1);
537 self.staged_messages += 1;
538 self.statistics.inc_bytes_staged_by(record_size);
539 self.staged_bytes += record_size;
540 self.producer.send(record).map_err(|(e, _)| e)
541 }
542
543 /// Commits all the staged updates of the currently open transaction plus a progress record
544 /// describing `upper` to the progress topic.
545 async fn commit_transaction(
546 &mut self,
547 upper: Antichain<Timestamp>,
548 ) -> Result<(), ContextCreationError> {
549 let progress = ProgressRecord {
550 frontier: upper,
551 version: self.sink_version,
552 };
553 let payload = serde_json::to_vec(&progress).expect("infallible");
554 let record = BaseRecord::to(&self.progress_topic)
555 .payload(&payload)
556 .key(&self.progress_key);
557 self.producer.send(record).map_err(|(e, _)| e)?;
558
559 fail::fail_point!("kafka_sink_commit_transaction");
560
561 let timeout = self.transaction_timeout;
562 match self
563 .spawn_blocking(move |p| p.commit_transaction(timeout))
564 .await
565 {
566 Ok(()) => {
567 self.statistics
568 .inc_messages_committed_by(self.staged_messages);
569 self.statistics.inc_bytes_committed_by(self.staged_bytes);
570 self.staged_messages = 0;
571 self.staged_bytes = 0;
572 Ok(())
573 }
574 Err(ContextCreationError::KafkaError(KafkaError::Transaction(err))) => {
575 // Make one attempt at aborting the transaction before letting the error percolate
576 // up and the process exit. Aborting allows the consumers of the topic to skip over
577 // any messages we've written in the transaction, so it's polite to do... but if it
578 // fails, the transaction will be aborted either when fenced out by a future
579 // version of this producer or by the broker-side timeout.
580 if err.txn_requires_abort() {
581 let timeout = self.socket_timeout;
582 self.spawn_blocking(move |p| p.abort_transaction(timeout))
583 .await?;
584 }
585 Err(ContextCreationError::KafkaError(KafkaError::Transaction(
586 err,
587 )))
588 }
589 Err(err) => Err(err),
590 }
591 }
592}
593
594/// Listens for statistics updates from librdkafka and updates our Prometheus metrics.
595async fn collect_statistics(
596 mut receiver: watch::Receiver<Statistics>,
597 metrics: Arc<KafkaSinkMetrics>,
598) {
599 while receiver.changed().await.is_ok() {
600 // The librdkafka per-broker statistics are either running totals (counters)
601 // or point-in-time values (gauges). In both cases we aggregate across brokers
602 // by re-summing from scratch each interval, so these accumulators are reset
603 // every time statistics are fetched.
604 //
605 // see <https://docs.confluent.io/platform/current/clients/librdkafka/html/md_STATISTICS.html#autotoc_md121>
606 let mut outbuf_cnt: i64 = 0;
607 let mut outbuf_msg_cnt: i64 = 0;
608 let mut waitresp_cnt: i64 = 0;
609 let mut waitresp_msg_cnt: i64 = 0;
610 let mut txerrs: u64 = 0;
611 let mut txretries: u64 = 0;
612 let mut req_timeouts: u64 = 0;
613 let mut connects: i64 = 0;
614 let mut disconnects: i64 = 0;
615
616 let stats = receiver.borrow();
617 for broker in stats.brokers.values() {
618 outbuf_cnt += broker.outbuf_cnt;
619 outbuf_msg_cnt += broker.outbuf_msg_cnt;
620 waitresp_cnt += broker.waitresp_cnt;
621 waitresp_msg_cnt += broker.waitresp_msg_cnt;
622 txerrs += broker.txerrs;
623 txretries += broker.txretries;
624 req_timeouts += broker.req_timeouts;
625 connects += broker.connects.unwrap_or(0);
626 disconnects += broker.disconnects.unwrap_or(0);
627 }
628 metrics.rdkafka_msg_cnt.set(stats.msg_cnt);
629 metrics.rdkafka_msg_size.set(stats.msg_size);
630 metrics.rdkafka_txmsgs.set(stats.txmsgs);
631 metrics.rdkafka_txmsg_bytes.set(stats.txmsg_bytes);
632 metrics.rdkafka_tx.set(stats.tx);
633 metrics.rdkafka_tx_bytes.set(stats.tx_bytes);
634 metrics.rdkafka_outbuf_cnt.set(outbuf_cnt);
635 metrics.rdkafka_outbuf_msg_cnt.set(outbuf_msg_cnt);
636 metrics.rdkafka_waitresp_cnt.set(waitresp_cnt);
637 metrics.rdkafka_waitresp_msg_cnt.set(waitresp_msg_cnt);
638 metrics.rdkafka_txerrs.set(txerrs);
639 metrics.rdkafka_txretries.set(txretries);
640 metrics.rdkafka_req_timeouts.set(req_timeouts);
641 metrics.rdkafka_connects.set(connects);
642 metrics.rdkafka_disconnects.set(disconnects);
643 }
644}
645
646/// A message to produce to Kafka.
647#[derive(Debug, Clone, Serialize, Deserialize)]
648struct KafkaMessage {
649 /// A hash of the key that can be used for partitioning.
650 hash: u64,
651 /// The message key.
652 key: Option<Vec<u8>>,
653 /// The message value.
654 value: Option<Vec<u8>>,
655 /// Message headers.
656 headers: Vec<KafkaHeader>,
657}
658
659/// A header to attach to a Kafka message.
660#[derive(Debug, Clone, Serialize, Deserialize)]
661struct KafkaHeader {
662 /// The header key.
663 key: String,
664 /// The header value.
665 value: Option<Vec<u8>>,
666}
667
668/// Sinks a collection of encoded rows to Kafka.
669///
670/// This operator exchanges all updates to a single worker by hashing on the given sink `id`.
671///
672/// Updates are sent in ascending timestamp order.
673fn sink_collection<'scope>(
674 name: String,
675 input: VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
676 sink_id: GlobalId,
677 connection: KafkaSinkConnection,
678 storage_configuration: StorageConfiguration,
679 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
680 metrics: KafkaSinkMetrics,
681 statistics: SinkStatistics,
682 write_handle: impl Future<
683 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
684 > + 'static,
685 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
686) -> (
687 StreamVec<'scope, Timestamp, HealthStatusMessage>,
688 PressOnDropButton,
689) {
690 let scope = input.scope();
691 let mut builder = AsyncOperatorBuilder::new(name.clone(), input.inner.scope());
692
693 // We want exactly one worker to send all the data to the sink topic.
694 let hashed_id = sink_id.hashed();
695 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
696 let buffer_min_capacity =
697 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS.handle(storage_configuration.config_set());
698
699 let mut input = builder.new_disconnected_input(input.inner, Exchange::new(move |_| hashed_id));
700
701 let as_of = sink.as_of.clone();
702 let sink_version = sink.version;
703 let (button, errors) = builder.build_fallible(move |_caps| {
704 Box::pin(async move {
705 if !is_active_worker {
706 write_frontier.borrow_mut().clear();
707 return Ok(());
708 }
709
710 fail::fail_point!("kafka_sink_creation_error", |_| Err(
711 ContextCreationError::Other(anyhow::anyhow!("synthetic error"))
712 ));
713
714 let mut write_handle = write_handle.await?;
715
716 let metrics = Arc::new(metrics);
717
718 let (mut producer, resume_upper) = TransactionalProducer::new(
719 sink_id,
720 &connection,
721 &storage_configuration,
722 Arc::clone(&metrics),
723 statistics,
724 sink_version,
725 )
726 .await?;
727
728 // The input has overcompacted if
729 let overcompacted =
730 // ..we have made some progress in the past
731 *resume_upper != [Timestamp::minimum()] &&
732 // ..but the since frontier is now beyond that
733 !PartialOrder::less_equal(&as_of, &resume_upper);
734 if overcompacted {
735 let err = format!(
736 "{name}: input compacted past resume upper: as_of {}, resume_upper: {}",
737 as_of.pretty(),
738 resume_upper.pretty()
739 );
740 // This would normally be an assertion but because it can happen after a
741 // Materialize backup/restore we log an error so that it appears on Sentry but
742 // leaves the rest of the objects in the cluster unaffected.
743 error!("{err}");
744 return Err(anyhow!("{err}").into());
745 }
746
747 info!(
748 "{name}: as_of: {}, resume upper: {}",
749 as_of.pretty(),
750 resume_upper.pretty()
751 );
752
753 // The section below relies on TotalOrder for correctness so we'll work with timestamps
754 // directly to make sure this doesn't compile if someone attempts to make this operator
755 // generic over partial orders in the future.
756 let Some(mut upper) = resume_upper.clone().into_option() else {
757 write_frontier.borrow_mut().clear();
758 return Ok(());
759 };
760
761 let mut deferred_updates = vec![];
762 let mut extra_updates = vec![];
763 // We must wait until we have data to commit before starting a transaction because
764 // Kafka doesn't have a heartbeating mechanism to keep a transaction open indefinitely.
765 // This flag tracks whether we have started the transaction.
766 let mut transaction_begun = false;
767 while let Some(event) = input.next().await {
768 match event {
769 Event::Data(_cap, batch) => {
770 for (message, time, diff) in batch {
771 // We want to publish updates in time order and we know that we have
772 // already committed all times not beyond `upper`. Therefore, if this
773 // update happens *exactly* at upper then it is the minimum pending
774 // time and so emitting it now will not violate the timestamp publish
775 // order. This optimization is load bearing because it is the mechanism
776 // by which we incrementally stream the initial snapshot out to Kafka
777 // instead of buffering it all in memory first. This argument doesn't
778 // hold for partially ordered time because many different timestamps
779 // can be *exactly* at upper but we can't know ahead of time which one
780 // will be advanced in the next progress message.
781 match upper.cmp(&time) {
782 Ordering::Less => deferred_updates.push((message, time, diff)),
783 Ordering::Equal => {
784 if !transaction_begun {
785 producer.begin_transaction().await?;
786 transaction_begun = true;
787 }
788 producer.send(&message, time, diff)?;
789 }
790 Ordering::Greater => continue,
791 }
792 }
793 }
794 Event::Progress(progress) => {
795 // Ignore progress updates before our resumption frontier
796 if !PartialOrder::less_equal(&resume_upper, &progress) {
797 continue;
798 }
799 // Also ignore progress updates until we are past the as_of frontier. This
800 // is to avoid the following pathological scenario:
801 // 1. Sink gets instantiated with an as_of = {10}, resume_upper = {0}.
802 // `progress` initially jumps at {10}, then the snapshot appears at time
803 // 10.
804 // 2. `progress` would normally advance to say {11} and we would commit the
805 // snapshot but clusterd crashes instead.
806 // 3. A new cluster restarts the sink with an earlier as_of, say {5}. This
807 // is valid, the earlier as_of has strictly more information. The
808 // snapshot now appears at time 5.
809 //
810 // If we were to commit an empty transaction in step 1 and advanced the
811 // resume_upper to {10} then in step 3 we would ignore the snapshot that
812 // now appears at 5 completely. So it is important to only start committing
813 // transactions after we're strictly beyond the as_of.
814 // TODO(petrosagg): is this logic an indication of us holding something
815 // wrong elsewhere? Investigate.
816 // Note: !PartialOrder::less_than(as_of, progress) would not be equivalent
817 // nor correct for partially ordered times.
818 if !as_of.iter().all(|t| !progress.less_equal(t)) {
819 continue;
820 }
821 if !transaction_begun {
822 producer.begin_transaction().await?;
823 }
824
825 extra_updates.extend(
826 deferred_updates
827 .extract_if(.., |(_, time, _)| !progress.less_equal(time)),
828 );
829 // Shrink after draining items out, so the call actually
830 // reduces capacity in the oversized-buffer scenario
831 // (e.g. progress topic was deleted and resume upper is 0).
832 deferred_updates.shrink_to(buffer_min_capacity.get());
833 extra_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
834
835 for (message, time, diff) in extra_updates.drain(..) {
836 producer.send(&message, time, diff)?;
837 }
838 extra_updates.shrink_to(buffer_min_capacity.get());
839
840 debug!("{name}: committing transaction for {}", progress.pretty());
841 producer.commit_transaction(progress.clone()).await?;
842 transaction_begun = false;
843 let mut expect_upper = write_handle.shared_upper();
844 loop {
845 if PartialOrder::less_equal(&progress, &expect_upper) {
846 // The frontier has already been advanced as far as necessary.
847 break;
848 }
849 // TODO(sinks): include the high water mark in the output topic for
850 // the messages we've published, if and when we allow reads to the sink
851 // directly, to allow monitoring the progress of the sink in terms of
852 // the output system.
853 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
854 match write_handle
855 .compare_and_append(EMPTY, expect_upper, progress.clone())
856 .await
857 .expect("valid usage")
858 {
859 Ok(()) => break,
860 Err(mismatch) => {
861 expect_upper = mismatch.current;
862 }
863 }
864 }
865 write_frontier.borrow_mut().clone_from(&progress);
866 match progress.into_option() {
867 Some(new_upper) => upper = new_upper,
868 None => break,
869 }
870 }
871 }
872 }
873 Ok(())
874 })
875 });
876
877 let statuses = errors.map(|error: Rc<ContextCreationError>| {
878 let hint = match *error {
879 ContextCreationError::KafkaError(KafkaError::Transaction(ref e)) => {
880 if e.is_retriable() && e.code() == RDKafkaErrorCode::OperationTimedOut {
881 let hint = "If you're running a single Kafka broker, ensure that the configs \
882 transaction.state.log.replication.factor, transaction.state.log.min.isr, \
883 and offsets.topic.replication.factor are set to 1 on the broker";
884 Some(hint.to_owned())
885 } else {
886 None
887 }
888 }
889 _ => None,
890 };
891
892 HealthStatusMessage {
893 id: None,
894 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint),
895 namespace: if matches!(*error, ContextCreationError::Ssh(_)) {
896 StatusNamespace::Ssh
897 } else {
898 StatusNamespace::Kafka
899 },
900 }
901 });
902
903 (statuses, button.press_on_drop())
904}
905
906/// Determines the latest progress record from the specified topic for the given
907/// progress key.
908///
909/// IMPORTANT: to achieve exactly once guarantees, the producer that will resume
910/// production at the returned timestamp *must* have called `init_transactions`
911/// prior to calling this method.
912async fn determine_sink_progress(
913 sink_id: GlobalId,
914 connection: &KafkaSinkConnection,
915 storage_configuration: &StorageConfiguration,
916 metrics: Arc<KafkaSinkMetrics>,
917) -> Result<Option<ProgressRecord>, ContextCreationError> {
918 // ****************************** WARNING ******************************
919 // Be VERY careful when editing the code in this function. It is very easy
920 // to accidentally introduce a correctness or liveness bug when refactoring
921 // this code.
922 // ****************************** WARNING ******************************
923
924 let TimeoutConfig {
925 fetch_metadata_timeout,
926 progress_record_fetch_timeout,
927 ..
928 } = storage_configuration.parameters.kafka_timeout_config;
929
930 let client_id = connection.client_id(
931 storage_configuration.config_set(),
932 &storage_configuration.connection_context,
933 sink_id,
934 );
935 let group_id = connection.progress_group_id(&storage_configuration.connection_context, sink_id);
936 let progress_topic = connection
937 .progress_topic(&storage_configuration.connection_context)
938 .into_owned();
939 let progress_topic_options = &connection.connection.progress_topic_options;
940 let progress_key = ProgressKey::new(sink_id);
941
942 let common_options = btreemap! {
943 // Consumer group ID, which may have been overridden by the user. librdkafka requires this,
944 // even though we'd prefer to disable the consumer group protocol entirely.
945 "group.id" => group_id,
946 // Allow Kafka monitoring tools to identify this consumer.
947 "client.id" => client_id,
948 "enable.auto.commit" => "false".into(),
949 "auto.offset.reset" => "earliest".into(),
950 // The fetch loop below needs EOF notifications to reliably detect that we have reached the
951 // high watermark.
952 "enable.partition.eof" => "true".into(),
953 };
954
955 // Construct two cliens in read committed and read uncommitted isolations respectively. See
956 // comment below for an explanation on why we need it.
957 let progress_client_read_committed: BaseConsumer<_> = {
958 let mut opts = common_options.clone();
959 opts.insert("isolation.level", "read_committed".into());
960 let ctx = MzClientContext::default();
961 connection
962 .connection
963 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
964 .await?
965 };
966
967 let progress_client_read_uncommitted: BaseConsumer<_> = {
968 let mut opts = common_options;
969 opts.insert("isolation.level", "read_uncommitted".into());
970 let ctx = MzClientContext::default();
971 connection
972 .connection
973 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
974 .await?
975 };
976
977 let ctx = Arc::clone(progress_client_read_committed.client().context());
978
979 // Ensure the progress topic exists.
980 let ensure_topic_config =
981 match &*SINK_ENSURE_TOPIC_CONFIG.get(storage_configuration.config_set()) {
982 "skip" => EnsureTopicConfig::Skip,
983 "check" => EnsureTopicConfig::Check,
984 "alter" => EnsureTopicConfig::Alter,
985 _ => {
986 tracing::warn!(
987 topic = progress_topic,
988 "unexpected value for ensure-topic-config; skipping checks"
989 );
990 EnsureTopicConfig::Skip
991 }
992 };
993 mz_storage_client::sink::ensure_kafka_topic(
994 connection,
995 storage_configuration,
996 &progress_topic,
997 progress_topic_options,
998 ensure_topic_config,
999 )
1000 .await
1001 .add_context("error registering kafka progress topic for sink")?;
1002
1003 // We are about to spawn a blocking task that cannot be aborted by simply calling .abort() on
1004 // its handle but we must be able to cancel it prompty so as to not leave long running
1005 // operations around when interest to this task is lost. To accomplish this we create a shared
1006 // token of which a weak reference is given to the task and a strong reference is held by the
1007 // parent task. The task periodically checks if its weak reference is still valid before
1008 // continuing its work.
1009 let parent_token = Arc::new(());
1010 let child_token = Arc::downgrade(&parent_token);
1011 let task_name = format!("get_latest_ts:{sink_id}");
1012 let sink_progress_search = SINK_PROGRESS_SEARCH.get(storage_configuration.config_set());
1013 let result = task::spawn_blocking(|| task_name, move || {
1014 let progress_topic = progress_topic.as_ref();
1015 // Ensure the progress topic has exactly one partition. Kafka only
1016 // guarantees ordering within a single partition, and we need a strict
1017 // order on the progress messages we read and write.
1018 let partitions = match mz_kafka_util::client::get_partitions(
1019 progress_client_read_committed.client(),
1020 progress_topic,
1021 fetch_metadata_timeout,
1022 ) {
1023 Ok(partitions) => partitions,
1024 Err(GetPartitionsError::TopicDoesNotExist) => {
1025 // The progress topic doesn't exist, which indicates there is
1026 // no committed timestamp.
1027 return Ok(None);
1028 }
1029 e => e.with_context(|| {
1030 format!(
1031 "Unable to fetch metadata about progress topic {}",
1032 progress_topic
1033 )
1034 })?,
1035 };
1036 if partitions.len() != 1 {
1037 bail!(
1038 "Progress topic {} should contain a single partition, but instead contains {} partitions",
1039 progress_topic, partitions.len(),
1040 );
1041 }
1042 let partition = partitions.into_element();
1043
1044 // We scan from the beginning and see if we can find a progress record. We have
1045 // to do it like this because Kafka Control Batches mess with offsets. We
1046 // therefore cannot simply take the last offset from the back and expect a
1047 // progress message there. With a transactional producer, the OffsetTail(1) will
1048 // not point to an progress message but a control message. With aborted
1049 // transactions, there might even be a lot of garbage at the end of the
1050 // topic or in between.
1051
1052 metrics.consumed_progress_records.set(0);
1053
1054 // First, determine the current high water mark for the progress topic.
1055 // This is the position our `progress_client` consumer *must* reach
1056 // before we can conclude that we've seen the latest progress record for
1057 // the specified `progress_key`. A safety argument:
1058 //
1059 // * Our caller has initialized transactions before calling this
1060 // method, which prevents the prior incarnation of this sink from
1061 // committing any further progress records.
1062 //
1063 // * We use `read_uncommitted` isolation to ensure that we fetch the
1064 // true high water mark for the topic, even if there are pending
1065 // transactions in the topic. If we used the `read_committed`
1066 // isolation level, we'd instead get the "last stable offset" (LSO),
1067 // which is the offset of the first message in an open transaction,
1068 // which might not include the last progress message committed for
1069 // this sink! (While the caller of this function has fenced out
1070 // older producers for this sink, *other* sinks writing using the
1071 // same progress topic might have long-running transactions that
1072 // hold back the LSO.)
1073 //
1074 // * If another sink spins up and fences out the producer for this
1075 // incarnation of the sink, we may not see the latest progress
1076 // record... but since the producer has been fenced out, it will be
1077 // unable to act on our stale information.
1078 //
1079 let (lo, hi) = progress_client_read_uncommitted
1080 .fetch_watermarks(progress_topic, partition, fetch_metadata_timeout)
1081 .map_err(|e| {
1082 anyhow!(
1083 "Failed to fetch metadata while reading from progress topic: {}",
1084 e
1085 )
1086 })?;
1087
1088 // This topic might be long, but the desired offset will usually be right near the end.
1089 // Instead of always scanning through the entire topic, we scan through exponentially-growing
1090 // suffixes of it. (Because writes are ordered, the largest progress record in any suffix,
1091 // if present, is the global max.) If we find it in one of our suffixes, we've saved at least
1092 // an order of magnitude of work; if we don't, we've added at most a constant factor.
1093 let mut start_indices = vec![lo];
1094 if sink_progress_search {
1095 let mut lookback = hi.saturating_sub(lo) / 10;
1096 while lookback >= 20_000 {
1097 start_indices.push(hi - lookback);
1098 lookback /= 10;
1099 }
1100 }
1101 for lo in start_indices.into_iter().rev() {
1102 if let Some(found) = progress_search(
1103 &progress_client_read_committed,
1104 progress_record_fetch_timeout,
1105 progress_topic,
1106 partition,
1107 lo,
1108 hi,
1109 progress_key.clone(),
1110 Weak::clone(&child_token),
1111 Arc::clone(&metrics)
1112 )? {
1113 return Ok(Some(found));
1114 }
1115 }
1116 Ok(None)
1117 }).await.check_ssh_status(&ctx);
1118 // Express interest to the computation until after we've received its result
1119 drop(parent_token);
1120 result
1121}
1122
1123fn progress_search<C: ConsumerContext + 'static>(
1124 progress_client_read_committed: &BaseConsumer<C>,
1125 progress_record_fetch_timeout: Duration,
1126 progress_topic: &str,
1127 partition: i32,
1128 lo: i64,
1129 hi: i64,
1130 progress_key: ProgressKey,
1131 child_token: Weak<()>,
1132 metrics: Arc<KafkaSinkMetrics>,
1133) -> anyhow::Result<Option<ProgressRecord>> {
1134 // Seek to the beginning of the given range in the progress topic.
1135 let mut tps = TopicPartitionList::new();
1136 tps.add_partition(progress_topic, partition);
1137 tps.set_partition_offset(progress_topic, partition, Offset::Offset(lo))?;
1138 progress_client_read_committed
1139 .assign(&tps)
1140 .with_context(|| {
1141 format!(
1142 "Error seeking in progress topic {}:{}",
1143 progress_topic, partition
1144 )
1145 })?;
1146
1147 // Helper to get the progress consumer's current position.
1148 let get_position = || {
1149 if child_token.strong_count() == 0 {
1150 bail!("operation cancelled");
1151 }
1152 let position = progress_client_read_committed
1153 .position()?
1154 .find_partition(progress_topic, partition)
1155 .ok_or_else(|| {
1156 anyhow!(
1157 "No position info found for progress topic {}",
1158 progress_topic
1159 )
1160 })?
1161 .offset();
1162 let position = match position {
1163 Offset::Offset(position) => position,
1164 // An invalid offset indicates the consumer has not yet read a
1165 // message. Since we assigned the consumer to the beginning of
1166 // the topic, it's safe to return the low water mark here, which
1167 // indicates the position before the first possible message.
1168 //
1169 // Note that it's important to return the low water mark and not
1170 // the minimum possible offset (i.e., zero) in order to break
1171 // out of the loop if the topic is empty but the low water mark
1172 // is greater than zero.
1173 Offset::Invalid => lo,
1174 _ => bail!(
1175 "Consumer::position returned offset of wrong type: {:?}",
1176 position
1177 ),
1178 };
1179 // Record the outstanding number of progress records that remain to be processed
1180 let outstanding = u64::try_from(std::cmp::max(0, hi - position)).unwrap();
1181 metrics.outstanding_progress_records.set(outstanding);
1182 Ok(position)
1183 };
1184
1185 info!("fetching latest progress record for {progress_key}, lo/hi: {lo}/{hi}");
1186
1187 // Read messages until the consumer is positioned at or beyond the high
1188 // water mark.
1189 //
1190 // We use `read_committed` isolation to ensure we don't see progress
1191 // records for transactions that did not commit. This means we have to
1192 // wait for the LSO to progress to the high water mark `hi`, which means
1193 // waiting for any open transactions for other sinks using the same
1194 // progress topic to complete. We set a short transaction timeout (10s)
1195 // to ensure we never need to wait more than 10s.
1196 //
1197 // Note that the stall time on the progress topic is not a function of
1198 // transaction size. We've designed our transactions so that the
1199 // progress record is always written last, after all the data has been
1200 // written, and so the window of time in which the progress topic has an
1201 // open transaction is quite small. The only vulnerability is if another
1202 // sink using the same progress topic crashes in that small window
1203 // between writing the progress record and committing the transaction,
1204 // in which case we have to wait out the transaction timeout.
1205 //
1206 // Important invariant: we only exit this loop successfully (i.e., not
1207 // returning an error) if we have positive proof of a position at or
1208 // beyond the high water mark. To make this invariant easy to check, do
1209 // not use `break` in the body of the loop.
1210 let mut last_progress: Option<ProgressRecord> = None;
1211 loop {
1212 let current_position = get_position()?;
1213
1214 if current_position >= hi {
1215 // consumer is at or beyond the high water mark and has read enough messages
1216 break;
1217 }
1218
1219 let message = match progress_client_read_committed.poll(progress_record_fetch_timeout) {
1220 Some(Ok(message)) => message,
1221 Some(Err(KafkaError::PartitionEOF(_))) => {
1222 // No message, but the consumer's position may have advanced
1223 // past a transaction control message that positions us at
1224 // or beyond the high water mark. Go around the loop again
1225 // to check.
1226 continue;
1227 }
1228 Some(Err(e)) => bail!("failed to fetch progress message {e}"),
1229 None => {
1230 bail!(
1231 "timed out while waiting to reach high water mark of non-empty \
1232 topic {progress_topic}:{partition}, lo/hi: {lo}/{hi}, current position: {current_position}"
1233 );
1234 }
1235 };
1236
1237 if message.key() != Some(progress_key.to_bytes()) {
1238 // This is a progress message for a different sink.
1239 continue;
1240 }
1241
1242 metrics.consumed_progress_records.inc();
1243
1244 let Some(payload) = message.payload() else {
1245 continue;
1246 };
1247 let progress = parse_progress_record(payload)?;
1248
1249 match last_progress {
1250 Some(last_progress)
1251 if !PartialOrder::less_equal(&last_progress.frontier, &progress.frontier) =>
1252 {
1253 bail!(
1254 "upper regressed in topic {progress_topic}:{partition} from {:?} to {:?}",
1255 &last_progress.frontier,
1256 &progress.frontier,
1257 );
1258 }
1259 _ => last_progress = Some(progress),
1260 }
1261 }
1262
1263 // If we get here, we are assured that we've read all messages up to
1264 // the high water mark, and therefore `last_timestamp` contains the
1265 // most recent timestamp for the sink under consideration.
1266 Ok(last_progress)
1267}
1268
1269/// This is the legacy struct that used to be emitted as part of a transactional produce and
1270/// contains the largest timestamp within the batch committed. Since it is just a timestamp it
1271/// cannot encode the fact that a sink has finished and deviates from upper frontier semantics.
1272/// Materialize no longer produces this record but it's possible that we encounter this in topics
1273/// written by older versions. In those cases we convert it into upper semantics by stepping the
1274/// timestamp forward.
1275#[derive(Debug, PartialEq, Serialize, Deserialize)]
1276pub struct LegacyProgressRecord {
1277 // Double Option to tell apart an omitted field from one set to null explicitly
1278 // https://github.com/serde-rs/serde/issues/984
1279 #[serde(default, deserialize_with = "deserialize_some")]
1280 pub timestamp: Option<Option<Timestamp>>,
1281}
1282
1283// Any value that is present is considered Some value, including null.
1284fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
1285where
1286 T: Deserialize<'de>,
1287 D: Deserializer<'de>,
1288{
1289 Deserialize::deserialize(deserializer).map(Some)
1290}
1291
1292/// This struct is emitted as part of a transactional produce, and contains the upper frontier of
1293/// the batch committed. It is used to recover the frontier a sink needs to resume at.
1294#[derive(Debug, PartialEq, Serialize, Deserialize)]
1295pub struct ProgressRecord {
1296 #[serde(
1297 deserialize_with = "deserialize_frontier",
1298 serialize_with = "serialize_frontier"
1299 )]
1300 pub frontier: Antichain<Timestamp>,
1301 #[serde(default)]
1302 pub version: u64,
1303}
1304fn serialize_frontier<S>(frontier: &Antichain<Timestamp>, serializer: S) -> Result<S::Ok, S::Error>
1305where
1306 S: Serializer,
1307{
1308 Serialize::serialize(frontier.elements(), serializer)
1309}
1310
1311fn deserialize_frontier<'de, D>(deserializer: D) -> Result<Antichain<Timestamp>, D::Error>
1312where
1313 D: Deserializer<'de>,
1314{
1315 let times: Vec<Timestamp> = Deserialize::deserialize(deserializer)?;
1316 Ok(Antichain::from(times))
1317}
1318
1319fn parse_progress_record(payload: &[u8]) -> Result<ProgressRecord, anyhow::Error> {
1320 Ok(match serde_json::from_slice::<ProgressRecord>(payload) {
1321 Ok(progress) => progress,
1322 // If we fail to deserialize we might be reading a legacy progress record
1323 Err(_) => match serde_json::from_slice::<LegacyProgressRecord>(payload) {
1324 Ok(LegacyProgressRecord {
1325 timestamp: Some(Some(time)),
1326 }) => ProgressRecord {
1327 frontier: Antichain::from_elem(time.step_forward()),
1328 version: 0,
1329 },
1330 Ok(LegacyProgressRecord {
1331 timestamp: Some(None),
1332 }) => ProgressRecord {
1333 frontier: Antichain::new(),
1334 version: 0,
1335 },
1336 _ => match std::str::from_utf8(payload) {
1337 Ok(payload) => bail!("invalid progress record: {payload}"),
1338 Err(_) => bail!("invalid progress record bytes: {payload:?}"),
1339 },
1340 },
1341 })
1342}
1343
1344/// Fetches the partition count for the identified topic.
1345async fn fetch_partition_count(
1346 producer: &ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1347 sink_id: GlobalId,
1348 topic_name: &str,
1349) -> Result<u64, anyhow::Error> {
1350 let meta = task::spawn_blocking(|| format!("kafka_sink_fetch_partition_count:{sink_id}"), {
1351 let producer = producer.clone();
1352 move || {
1353 producer
1354 .client()
1355 .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
1356 }
1357 })
1358 .await
1359 .check_ssh_status(producer.context())?;
1360
1361 match meta.topics().iter().find(|t| t.name() == topic_name) {
1362 Some(topic) => {
1363 let partition_count = u64::cast_from(topic.partitions().len());
1364 if partition_count == 0 {
1365 bail!("topic {topic_name} has an impossible partition count of zero");
1366 }
1367 Ok(partition_count)
1368 }
1369 None => bail!("topic {topic_name} does not exist"),
1370 }
1371}
1372
1373/// Fetches the partition count for the identified topic at the specified
1374/// interval.
1375///
1376/// When an updated partition count is discovered, invokes
1377/// `update_partition_count` with the new partition count.
1378async fn fetch_partition_count_loop<F>(
1379 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1380 sink_id: GlobalId,
1381 topic_name: String,
1382 interval: Duration,
1383 update_partition_count: Arc<F>,
1384) where
1385 F: Fn(u64),
1386{
1387 let mut interval = time::interval(interval);
1388 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
1389 loop {
1390 interval.tick().await;
1391 match fetch_partition_count(&producer, sink_id, &topic_name).await {
1392 Ok(pc) => update_partition_count(pc),
1393 Err(e) => {
1394 warn!(%sink_id, "failed updating partition count: {e}");
1395 continue;
1396 }
1397 };
1398 }
1399}
1400
1401/// Walks each arrangement batch and emits encoded Kafka messages, one per
1402/// `DiffPair` observed at each `(key, timestamp)`.
1403///
1404/// When `key_is_synthetic`, the batch keys are per-row hashes used only for
1405/// worker distribution; the emitted `KafkaMessage` uses no key in that case.
1406fn encode_collection<'scope>(
1407 name: String,
1408 batches: SinkBatchStream<'scope>,
1409 envelope: SinkEnvelope,
1410 connection: KafkaSinkConnection,
1411 storage_configuration: StorageConfiguration,
1412 sink_id: GlobalId,
1413 from_id: GlobalId,
1414 key_is_synthetic: bool,
1415) -> (
1416 VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
1417 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1418 PressOnDropButton,
1419) {
1420 let mut builder = AsyncOperatorBuilder::new(name, batches.scope());
1421
1422 let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1423 let mut input = builder.new_input_for(batches, Pipeline, &output);
1424
1425 let (button, errors) = builder.build_fallible(move |caps| {
1426 Box::pin(async move {
1427 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1428 let key_desc = connection
1429 .key_desc_and_indices
1430 .as_ref()
1431 .map(|(desc, _indices)| desc.clone());
1432 let value_desc = connection.value_desc;
1433
1434 let key_encoder: Option<Box<dyn Encode>> =
1435 match (key_desc, connection.format.key_format) {
1436 (Some(desc), Some(KafkaSinkFormatType::Bytes)) => {
1437 Some(Box::new(BinaryEncoder::new(desc, false)))
1438 }
1439 (Some(desc), Some(KafkaSinkFormatType::Text)) => {
1440 Some(Box::new(TextEncoder::new(desc, false)))
1441 }
1442 (Some(desc), Some(KafkaSinkFormatType::Json)) => {
1443 Some(Box::new(JsonEncoder::new(desc, false)))
1444 }
1445 (Some(desc), Some(KafkaSinkFormatType::Avro {
1446 schema,
1447 compatibility_level,
1448 wire_format,
1449 })) => {
1450 // Ensure that schemas are registered with the schema registry.
1451 //
1452 // Note that where this lies in the rendering cycle means that we will publish the
1453 // schemas each time the sink is rendered.
1454 let csr_connection = match wire_format {
1455 WireFormat::Confluent {
1456 registry: Some(csr),
1457 } => csr,
1458 // Sinks are only ever built with a Confluent
1459 // registry (see the sink planner), so anything
1460 // else is unreachable.
1461 other => unreachable!(
1462 "sink Avro key wire_format must be Confluent with registry, got {:?}",
1463 other
1464 ),
1465 };
1466 let ccsr = csr_connection
1467 .connect(&storage_configuration, InTask::Yes)
1468 .await?;
1469
1470 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1471 ccsr,
1472 format!("{}-key", connection.topic),
1473 schema.clone(),
1474 mz_ccsr::SchemaType::Avro,
1475 compatibility_level,
1476 )
1477 .await
1478 .context("error publishing kafka schemas for sink")?;
1479
1480 Some(Box::new(AvroEncoder::new(desc, false, &schema, schema_id)))
1481 }
1482 (None, None) => None,
1483 (desc, format) => {
1484 return Err(anyhow!(
1485 "key_desc and key_format must be both set or both unset, but key_desc: {:?}, key_format: {:?}",
1486 desc,
1487 format
1488 ))
1489 }
1490 };
1491
1492 // whether to apply the debezium envelope to the value encoding
1493 let debezium = matches!(envelope, SinkEnvelope::Debezium);
1494
1495 let value_encoder: Box<dyn Encode> = match connection.format.value_format {
1496 KafkaSinkFormatType::Bytes => Box::new(BinaryEncoder::new(value_desc, debezium)),
1497 KafkaSinkFormatType::Text => Box::new(TextEncoder::new(value_desc, debezium)),
1498 KafkaSinkFormatType::Json => Box::new(JsonEncoder::new(value_desc, debezium)),
1499 KafkaSinkFormatType::Avro {
1500 schema,
1501 compatibility_level,
1502 wire_format,
1503 } => {
1504 let csr_connection = match wire_format {
1505 WireFormat::Confluent {
1506 registry: Some(csr),
1507 } => csr,
1508 other => unreachable!(
1509 "sink Avro value wire_format must be Confluent with registry, got {:?}",
1510 other
1511 ),
1512 };
1513 // Ensure that schemas are registered with the schema registry.
1514 //
1515 // Note that where this lies in the rendering cycle means that we will publish the
1516 // schemas each time the sink is rendered.
1517 let ccsr = csr_connection
1518 .connect(&storage_configuration, InTask::Yes)
1519 .await?;
1520
1521 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1522 ccsr,
1523 format!("{}-value", connection.topic),
1524 schema.clone(),
1525 mz_ccsr::SchemaType::Avro,
1526 compatibility_level,
1527 )
1528 .await
1529 .context("error publishing kafka schemas for sink")?;
1530
1531 Box::new(AvroEncoder::new(value_desc, debezium, &schema, schema_id))
1532 }
1533 };
1534
1535 // !IMPORTANT!
1536 // Correctness of this operator relies on no fallible operations happening after this
1537 // point. This is a temporary workaround of build_fallible's bad interaction of owned
1538 // capabilities and errors.
1539 // TODO(petrosagg): Make the fallible async operator safe
1540 *capset = CapabilitySet::new();
1541
1542 let mut row_buf = Row::default();
1543 let mut datums = DatumVec::new();
1544 let mut pk_warner =
1545 (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id));
1546
1547 while let Some(event) = input.next().await {
1548 if let Event::Data(cap, mut batches) = event {
1549 for batch in batches.drain(..) {
1550 for_each_diff_pair(&batch, |key, time, value| {
1551 if let Some(warner) = pk_warner.as_mut() {
1552 warner.observe(key, time);
1553 }
1554 // Only emit the arrangement key when the user configured one; relation-key
1555 // and synthetic-hash arrangements exist purely for grouping / worker
1556 // distribution and have no corresponding key encoder.
1557 let key_for_message = if key_encoder.is_some() { key } else { &None };
1558
1559 let mut hash = None;
1560 let mut headers = vec![];
1561 if connection.headers_index.is_some()
1562 || connection.partition_by.is_some()
1563 {
1564 // Header values and partition by values are derived from the row
1565 // that produces an event. But it is ambiguous whether to use the
1566 // `before` or `after` from the event. The rule applied here is
1567 // simple: use `after` if it exists (insertions and updates),
1568 // otherwise fall back to `before` (deletions).
1569 //
1570 // It is up to the SQL planner to ensure this produces sensible
1571 // results. (When using the upsert envelope and both `before` and
1572 // `after` are present, it's always unambiguous to use `after`
1573 // because that's all that will be present in the Kafka message;
1574 // when using the Debezium envelope, it's okay to refer to columns
1575 // in the key because those are guaranteed to be the same in both
1576 // `before` and `after`.)
1577 let row = value
1578 .after
1579 .as_ref()
1580 .or(value.before.as_ref())
1581 .expect("one of before or after must be set");
1582 let row = datums.borrow_with(row);
1583
1584 if let Some(i) = connection.headers_index {
1585 headers = encode_headers(row[i]);
1586 }
1587
1588 if let Some(partition_by) = &connection.partition_by {
1589 hash = Some(evaluate_partition_by(partition_by, &row));
1590 }
1591 }
1592 let (encoded_key, hash) = match key_for_message {
1593 Some(key) => {
1594 let key_encoder =
1595 key_encoder.as_ref().expect("key present");
1596 let encoded = key_encoder.encode_unchecked(key.clone());
1597 let hash =
1598 hash.unwrap_or_else(|| key_encoder.hash(&encoded));
1599 (Some(encoded), hash)
1600 }
1601 None => (None, hash.unwrap_or(0)),
1602 };
1603 let value = match envelope {
1604 SinkEnvelope::Upsert => value.after,
1605 SinkEnvelope::Debezium => {
1606 dbz_format(&mut row_buf.packer(), value);
1607 Some(row_buf.clone())
1608 }
1609 SinkEnvelope::Append => {
1610 unreachable!("Append envelope is not valid for Kafka sinks")
1611 }
1612 };
1613 let value = value.map(|value| value_encoder.encode_unchecked(value));
1614 let message = KafkaMessage {
1615 hash,
1616 key: encoded_key,
1617 value,
1618 headers,
1619 };
1620 output.give(&cap, (message, time, Diff::ONE));
1621 });
1622 // Flush after each batch so the final `(key, time)` group of the walk is
1623 // resolved immediately — a PK violation in the last group is otherwise
1624 // held until more data arrives or the operator shuts down.
1625 if let Some(warner) = pk_warner.as_mut() {
1626 warner.flush();
1627 }
1628 }
1629 }
1630 }
1631
1632 Ok::<(), anyhow::Error>(())
1633 })
1634 });
1635
1636 let statuses = errors.map(|error| HealthStatusMessage {
1637 id: None,
1638 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1639 namespace: StatusNamespace::Kafka,
1640 });
1641
1642 (stream.as_collection(), statuses, button.press_on_drop())
1643}
1644
1645fn encode_headers(datum: Datum) -> Vec<KafkaHeader> {
1646 let mut out = vec![];
1647 if datum.is_null() {
1648 return out;
1649 }
1650 for (key, value) in datum.unwrap_map().iter() {
1651 out.push(KafkaHeader {
1652 key: key.into(),
1653 value: match value {
1654 Datum::Null => None,
1655 Datum::String(s) => Some(s.as_bytes().to_vec()),
1656 Datum::Bytes(b) => Some(b.to_vec()),
1657 _ => panic!("encode_headers called with unexpected header value {value:?}"),
1658 },
1659 })
1660 }
1661 out
1662}
1663
1664/// Evaluates a partition by expression on the given row, returning the hash
1665/// value to use for partition assignment.
1666///
1667/// The provided expression must have type `Int32`, `Int64`, `UInt32`, or
1668/// `UInt64`. If the expression produces an error when evaluated, or if the
1669/// expression is of a signed integer type and produces a negative value, this
1670/// function returns 0.
1671fn evaluate_partition_by(partition_by: &MirScalarExpr, row: &[Datum]) -> u64 {
1672 // NOTE(benesch): The way this function converts errors and invalid values
1673 // to 0 is somewhat surpising. Ideally, we would put the sink in a
1674 // permanently errored state if the partition by expression produces an
1675 // error or invalid value. But we don't presently have a way for sinks to
1676 // report errors (see materialize#17688), so the current behavior was determined to be
1677 // the best available option. The behavior is clearly documented in the
1678 // user-facing `CREATE SINK` docs.
1679 let temp_storage = RowArena::new();
1680 match partition_by.eval(row, &temp_storage) {
1681 Ok(Datum::UInt64(u)) => u,
1682 Ok(datum) => {
1683 // If we are here the only valid type that we should be seeing is
1684 // null. Anything else is a bug in the planner.
1685 soft_assert_or_log!(datum.is_null(), "unexpected partition_by result: {datum:?}");
1686 // We treat nulls the same as we treat errors: map them to partition 0.
1687 0
1688 }
1689 Err(_) => 0,
1690 }
1691}
1692
1693#[cfg(test)]
1694mod test {
1695 use mz_ore::assert_err;
1696
1697 use super::*;
1698
1699 #[mz_ore::test]
1700 fn progress_record_migration() {
1701 assert_err!(parse_progress_record(b"{}"));
1702
1703 assert_eq!(
1704 parse_progress_record(b"{\"timestamp\":1}").unwrap(),
1705 ProgressRecord {
1706 frontier: Antichain::from_elem(2.into()),
1707 version: 0,
1708 }
1709 );
1710
1711 assert_eq!(
1712 parse_progress_record(b"{\"timestamp\":null}").unwrap(),
1713 ProgressRecord {
1714 frontier: Antichain::new(),
1715 version: 0,
1716 }
1717 );
1718
1719 assert_eq!(
1720 parse_progress_record(b"{\"frontier\":[1]}").unwrap(),
1721 ProgressRecord {
1722 frontier: Antichain::from_elem(1.into()),
1723 version: 0,
1724 }
1725 );
1726
1727 assert_eq!(
1728 parse_progress_record(b"{\"frontier\":[]}").unwrap(),
1729 ProgressRecord {
1730 frontier: Antichain::new(),
1731 version: 0,
1732 }
1733 );
1734
1735 assert_eq!(
1736 parse_progress_record(b"{\"frontier\":[], \"version\": 42}").unwrap(),
1737 ProgressRecord {
1738 frontier: Antichain::new(),
1739 version: 42,
1740 }
1741 );
1742
1743 assert_err!(parse_progress_record(b"{\"frontier\":null}"));
1744 }
1745}