1use std::cell::RefCell;
74use std::cmp::Ordering;
75use std::collections::BTreeMap;
76use std::future::Future;
77use std::rc::Rc;
78use std::sync::atomic::AtomicU64;
79use std::sync::{Arc, Weak};
80use std::time::Duration;
81
82use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
83use crate::metrics::sink::kafka::KafkaSinkMetrics;
84use crate::render::sinks::SinkRender;
85use crate::statistics::SinkStatistics;
86use crate::storage_state::StorageState;
87use anyhow::{Context, anyhow, bail};
88use differential_dataflow::{AsCollection, Hashable, VecCollection};
89use futures::StreamExt;
90use maplit::btreemap;
91use mz_expr::MirScalarExpr;
92use mz_interchange::avro::{AvroEncoder, DiffPair};
93use mz_interchange::encode::Encode;
94use mz_interchange::envelopes::dbz_format;
95use mz_interchange::json::JsonEncoder;
96use mz_interchange::text_binary::{BinaryEncoder, TextEncoder};
97use mz_kafka_util::admin::EnsureTopicConfig;
98use mz_kafka_util::client::{
99 DEFAULT_FETCH_METADATA_TIMEOUT, GetPartitionsError, MzClientContext, TimeoutConfig,
100 TunnelingClientContext,
101};
102use mz_ore::cast::CastFrom;
103use mz_ore::collections::CollectionExt;
104use mz_ore::error::ErrorExt;
105use mz_ore::future::InTask;
106use mz_ore::soft_assert_or_log;
107use mz_ore::task::{self, AbortOnDropHandle};
108use mz_persist_client::Diagnostics;
109use mz_persist_client::write::WriteHandle;
110use mz_persist_types::codec_impls::UnitSchema;
111use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
112use mz_storage_client::sink::progress_key::ProgressKey;
113use mz_storage_types::StorageDiff;
114use mz_storage_types::configuration::StorageConfiguration;
115use mz_storage_types::controller::CollectionMetadata;
116use mz_storage_types::dyncfgs::{
117 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS, SINK_ENSURE_TOPIC_CONFIG, SINK_PROGRESS_SEARCH,
118};
119use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
120use mz_storage_types::sinks::{
121 KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, StorageSinkDesc,
122};
123use mz_storage_types::sources::SourceData;
124use mz_timely_util::antichain::AntichainExt;
125use mz_timely_util::builder_async::{
126 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
127};
128use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
129use rdkafka::error::KafkaError;
130use rdkafka::message::{Header, OwnedHeaders, ToBytes};
131use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
132use rdkafka::types::RDKafkaErrorCode;
133use rdkafka::{Message, Offset, Statistics, TopicPartitionList};
134use serde::{Deserialize, Deserializer, Serialize, Serializer};
135use timely::PartialOrder;
136use timely::container::CapacityContainerBuilder;
137use timely::dataflow::StreamVec;
138use timely::dataflow::channels::pact::{Exchange, Pipeline};
139use timely::dataflow::operators::vec::{Map, ToStream};
140use timely::dataflow::operators::{CapabilitySet, Concatenate};
141use timely::progress::{Antichain, Timestamp as _};
142use tokio::sync::watch;
143use tokio::time::{self, MissedTickBehavior};
144use tracing::{debug, error, info, warn};
145
146impl<'scope> SinkRender<'scope> for KafkaSinkConnection {
147 fn get_key_indices(&self) -> Option<&[usize]> {
148 self.key_desc_and_indices
149 .as_ref()
150 .map(|(_desc, indices)| indices.as_slice())
151 }
152
153 fn get_relation_key_indices(&self) -> Option<&[usize]> {
154 self.relation_key_indices.as_deref()
155 }
156
157 fn render_sink(
158 &self,
159 storage_state: &mut StorageState,
160 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
161 sink_id: GlobalId,
162 input: VecCollection<'scope, Timestamp, (Option<Row>, DiffPair<Row>), Diff>,
163 _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
166 ) -> (
167 StreamVec<'scope, Timestamp, HealthStatusMessage>,
168 Vec<PressOnDropButton>,
169 ) {
170 let scope = input.scope();
171
172 let write_handle = {
173 let persist = Arc::clone(&storage_state.persist_clients);
174 let shard_meta = sink.to_storage_metadata.clone();
175 async move {
176 let client = persist.open(shard_meta.persist_location).await?;
177 let handle = client
178 .open_writer(
179 shard_meta.data_shard,
180 Arc::new(shard_meta.relation_desc),
181 Arc::new(UnitSchema),
182 Diagnostics::from_purpose("sink handle"),
183 )
184 .await?;
185 Ok(handle)
186 }
187 };
188
189 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
190 storage_state
191 .sink_write_frontiers
192 .insert(sink_id, Rc::clone(&write_frontier));
193
194 let (encoded, encode_status, encode_token) = encode_collection(
195 format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
196 input,
197 sink.envelope,
198 self.clone(),
199 storage_state.storage_configuration.clone(),
200 );
201
202 let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id);
203 let statistics = storage_state
204 .aggregated_statistics
205 .get_sink(&sink_id)
206 .expect("statistics initialized")
207 .clone();
208
209 let (sink_status, sink_token) = sink_collection(
210 format!("kafka-{sink_id}-sink"),
211 encoded,
212 sink_id,
213 self.clone(),
214 storage_state.storage_configuration.clone(),
215 sink,
216 metrics,
217 statistics,
218 write_handle,
219 write_frontier,
220 );
221
222 let running_status = Some(HealthStatusMessage {
223 id: None,
224 update: HealthStatusUpdate::Running,
225 namespace: StatusNamespace::Kafka,
226 })
227 .to_stream(scope);
228
229 let status = scope.concatenate([running_status, encode_status, sink_status]);
230
231 (status, vec![encode_token, sink_token])
232 }
233}
234
235struct TransactionalProducer {
236 task_name: String,
238 data_topic: String,
240 progress_topic: String,
242 progress_key: ProgressKey,
244 sink_version: u64,
246 partition_count: Arc<AtomicU64>,
248 _partition_count_task: AbortOnDropHandle<()>,
250 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
252 statistics: SinkStatistics,
254 staged_messages: u64,
257 staged_bytes: u64,
260 socket_timeout: Duration,
262 transaction_timeout: Duration,
264}
265
266impl TransactionalProducer {
267 async fn new(
271 sink_id: GlobalId,
272 connection: &KafkaSinkConnection,
273 storage_configuration: &StorageConfiguration,
274 metrics: Arc<KafkaSinkMetrics>,
275 statistics: SinkStatistics,
276 sink_version: u64,
277 ) -> Result<(Self, Antichain<mz_repr::Timestamp>), ContextCreationError> {
278 let client_id = connection.client_id(
279 storage_configuration.config_set(),
280 &storage_configuration.connection_context,
281 sink_id,
282 );
283 let transactional_id =
284 connection.transactional_id(&storage_configuration.connection_context, sink_id);
285
286 let timeout_config = &storage_configuration.parameters.kafka_timeout_config;
287 let mut options = BTreeMap::new();
288 options.insert("enable.idempotence", "true".into());
292 options.insert(
294 "compression.type",
295 connection.compression_type.to_librdkafka_option().into(),
296 );
297 options.insert("queue.buffering.max.kbytes", "2147483647".into());
300 options.insert("queue.buffering.max.messages", "0".into());
303 options.insert("queue.buffering.max.ms", format!("{}", 10));
305 options.insert(
307 "transaction.timeout.ms",
308 format!("{}", timeout_config.transaction_timeout.as_millis()),
309 );
310 options.insert("transactional.id", transactional_id);
312 options.insert("client.id", client_id);
314 options.insert("statistics.interval.ms", "1000".into());
316
317 let ctx = MzClientContext::default();
318
319 let stats_receiver = ctx.subscribe_statistics();
320 let task_name = format!("kafka_sink_metrics_collector:{sink_id}");
321 task::spawn(
322 || &task_name,
323 collect_statistics(stats_receiver, Arc::clone(&metrics)),
324 );
325
326 let producer: ThreadedProducer<_> = connection
327 .connection
328 .create_with_context(storage_configuration, ctx, &options, InTask::Yes)
329 .await?;
330
331 let partition_count = Arc::new(AtomicU64::new(0));
333 let update_partition_count = {
334 let partition_count = Arc::clone(&partition_count);
335 let metrics = Arc::clone(&metrics);
336 Arc::new(move |pc| {
337 partition_count.store(pc, std::sync::atomic::Ordering::SeqCst);
338 metrics.partition_count.set(pc);
339 })
340 };
341
342 let partition_count_task = task::spawn(
345 || format!("kafka_sink_producer_fetch_metadata_loop:{sink_id}"),
346 fetch_partition_count_loop(
347 producer.clone(),
348 sink_id,
349 connection.topic.clone(),
350 connection.topic_metadata_refresh_interval,
351 Arc::clone(&update_partition_count),
352 ),
353 );
354
355 let task_name = format!("kafka_sink_producer:{sink_id}");
356 let progress_key = ProgressKey::new(sink_id);
357
358 let producer = Self {
359 task_name,
360 data_topic: connection.topic.clone(),
361 partition_count,
362 _partition_count_task: partition_count_task.abort_on_drop(),
363 progress_topic: connection
364 .progress_topic(&storage_configuration.connection_context)
365 .into_owned(),
366 progress_key,
367 sink_version,
368 producer,
369 statistics,
370 staged_messages: 0,
371 staged_bytes: 0,
372 socket_timeout: timeout_config.socket_timeout,
373 transaction_timeout: timeout_config.transaction_timeout,
374 };
375
376 let timeout = timeout_config.socket_timeout;
377 producer
378 .spawn_blocking(move |p| p.init_transactions(timeout))
379 .await?;
380
381 let progress = determine_sink_progress(
384 sink_id,
385 connection,
386 storage_configuration,
387 Arc::clone(&metrics),
388 )
389 .await?;
390
391 let resume_upper = match progress {
392 Some(progress) => {
393 if sink_version < progress.version {
394 return Err(ContextCreationError::Other(anyhow!(
395 "Fenced off by newer version of the sink. ours={} theirs={}",
396 sink_version,
397 progress.version
398 )));
399 }
400 progress.frontier
401 }
402 None => {
403 mz_storage_client::sink::ensure_kafka_topic(
404 connection,
405 storage_configuration,
406 &connection.topic,
407 &connection.topic_options,
408 EnsureTopicConfig::Skip,
409 )
410 .await?;
411 Antichain::from_elem(Timestamp::minimum())
412 }
413 };
414
415 let partition_count =
420 fetch_partition_count(&producer.producer, sink_id, &connection.topic).await?;
421 update_partition_count(partition_count);
422
423 Ok((producer, resume_upper))
424 }
425
426 async fn spawn_blocking<F, R>(&self, f: F) -> Result<R, ContextCreationError>
429 where
430 F: FnOnce(
431 ThreadedProducer<TunnelingClientContext<MzClientContext>>,
432 ) -> Result<R, KafkaError>
433 + Send
434 + 'static,
435 R: Send + 'static,
436 {
437 let producer = self.producer.clone();
438 task::spawn_blocking(|| &self.task_name, move || f(producer))
439 .await
440 .check_ssh_status(self.producer.context())
441 }
442
443 async fn begin_transaction(&self) -> Result<(), ContextCreationError> {
444 self.spawn_blocking(|p| p.begin_transaction()).await
445 }
446
447 fn send(
453 &mut self,
454 message: &KafkaMessage,
455 time: Timestamp,
456 diff: Diff,
457 ) -> Result<(), KafkaError> {
458 assert_eq!(diff, Diff::ONE, "invalid sink update");
459
460 let mut headers = OwnedHeaders::new().insert(Header {
461 key: "materialize-timestamp",
462 value: Some(time.to_string().as_bytes()),
463 });
464 for header in &message.headers {
465 if header.key.starts_with("materialize-") {
471 continue;
472 }
473
474 headers = headers.insert(Header {
475 key: header.key.as_str(),
476 value: header.value.as_ref(),
477 });
478 }
479
480 let pc = self
481 .partition_count
482 .load(std::sync::atomic::Ordering::SeqCst);
483 let partition = Some(i32::try_from(message.hash % pc).unwrap());
484
485 let record = BaseRecord {
486 topic: &self.data_topic,
487 key: message.key.as_ref(),
488 payload: message.value.as_ref(),
489 headers: Some(headers),
490 partition,
491 timestamp: None,
492 delivery_opaque: (),
493 };
494 let key_size = message.key.as_ref().map(|k| k.len()).unwrap_or(0);
495 let value_size = message.value.as_ref().map(|k| k.len()).unwrap_or(0);
496 let headers_size = message
497 .headers
498 .iter()
499 .map(|h| h.key.len() + h.value.as_ref().map(|v| v.len()).unwrap_or(0))
500 .sum::<usize>();
501 let record_size = u64::cast_from(key_size + value_size + headers_size);
502 self.statistics.inc_messages_staged_by(1);
503 self.staged_messages += 1;
504 self.statistics.inc_bytes_staged_by(record_size);
505 self.staged_bytes += record_size;
506 self.producer.send(record).map_err(|(e, _)| e)
507 }
508
509 async fn commit_transaction(
512 &mut self,
513 upper: Antichain<Timestamp>,
514 ) -> Result<(), ContextCreationError> {
515 let progress = ProgressRecord {
516 frontier: upper,
517 version: self.sink_version,
518 };
519 let payload = serde_json::to_vec(&progress).expect("infallible");
520 let record = BaseRecord::to(&self.progress_topic)
521 .payload(&payload)
522 .key(&self.progress_key);
523 self.producer.send(record).map_err(|(e, _)| e)?;
524
525 fail::fail_point!("kafka_sink_commit_transaction");
526
527 let timeout = self.transaction_timeout;
528 match self
529 .spawn_blocking(move |p| p.commit_transaction(timeout))
530 .await
531 {
532 Ok(()) => {
533 self.statistics
534 .inc_messages_committed_by(self.staged_messages);
535 self.statistics.inc_bytes_committed_by(self.staged_bytes);
536 self.staged_messages = 0;
537 self.staged_bytes = 0;
538 Ok(())
539 }
540 Err(ContextCreationError::KafkaError(KafkaError::Transaction(err))) => {
541 if err.txn_requires_abort() {
547 let timeout = self.socket_timeout;
548 self.spawn_blocking(move |p| p.abort_transaction(timeout))
549 .await?;
550 }
551 Err(ContextCreationError::KafkaError(KafkaError::Transaction(
552 err,
553 )))
554 }
555 Err(err) => Err(err),
556 }
557 }
558}
559
560async fn collect_statistics(
562 mut receiver: watch::Receiver<Statistics>,
563 metrics: Arc<KafkaSinkMetrics>,
564) {
565 let mut outbuf_cnt: i64 = 0;
566 let mut outbuf_msg_cnt: i64 = 0;
567 let mut waitresp_cnt: i64 = 0;
568 let mut waitresp_msg_cnt: i64 = 0;
569 let mut txerrs: u64 = 0;
570 let mut txretries: u64 = 0;
571 let mut req_timeouts: u64 = 0;
572 let mut connects: i64 = 0;
573 let mut disconnects: i64 = 0;
574 while receiver.changed().await.is_ok() {
575 let stats = receiver.borrow();
576 for broker in stats.brokers.values() {
577 outbuf_cnt += broker.outbuf_cnt;
578 outbuf_msg_cnt += broker.outbuf_msg_cnt;
579 waitresp_cnt += broker.waitresp_cnt;
580 waitresp_msg_cnt += broker.waitresp_msg_cnt;
581 txerrs += broker.txerrs;
582 txretries += broker.txretries;
583 req_timeouts += broker.req_timeouts;
584 connects += broker.connects.unwrap_or(0);
585 disconnects += broker.disconnects.unwrap_or(0);
586 }
587 metrics.rdkafka_msg_cnt.set(stats.msg_cnt);
588 metrics.rdkafka_msg_size.set(stats.msg_size);
589 metrics.rdkafka_txmsgs.set(stats.txmsgs);
590 metrics.rdkafka_txmsg_bytes.set(stats.txmsg_bytes);
591 metrics.rdkafka_tx.set(stats.tx);
592 metrics.rdkafka_tx_bytes.set(stats.tx_bytes);
593 metrics.rdkafka_outbuf_cnt.set(outbuf_cnt);
594 metrics.rdkafka_outbuf_msg_cnt.set(outbuf_msg_cnt);
595 metrics.rdkafka_waitresp_cnt.set(waitresp_cnt);
596 metrics.rdkafka_waitresp_msg_cnt.set(waitresp_msg_cnt);
597 metrics.rdkafka_txerrs.set(txerrs);
598 metrics.rdkafka_txretries.set(txretries);
599 metrics.rdkafka_req_timeouts.set(req_timeouts);
600 metrics.rdkafka_connects.set(connects);
601 metrics.rdkafka_disconnects.set(disconnects);
602 }
603}
604
605#[derive(Debug, Clone, Serialize, Deserialize)]
607struct KafkaMessage {
608 hash: u64,
610 key: Option<Vec<u8>>,
612 value: Option<Vec<u8>>,
614 headers: Vec<KafkaHeader>,
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize)]
620struct KafkaHeader {
621 key: String,
623 value: Option<Vec<u8>>,
625}
626
627fn sink_collection<'scope>(
633 name: String,
634 input: VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
635 sink_id: GlobalId,
636 connection: KafkaSinkConnection,
637 storage_configuration: StorageConfiguration,
638 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
639 metrics: KafkaSinkMetrics,
640 statistics: SinkStatistics,
641 write_handle: impl Future<
642 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
643 > + 'static,
644 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
645) -> (
646 StreamVec<'scope, Timestamp, HealthStatusMessage>,
647 PressOnDropButton,
648) {
649 let scope = input.scope();
650 let mut builder = AsyncOperatorBuilder::new(name.clone(), input.inner.scope());
651
652 let hashed_id = sink_id.hashed();
654 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
655 let buffer_min_capacity =
656 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS.handle(storage_configuration.config_set());
657
658 let mut input = builder.new_disconnected_input(input.inner, Exchange::new(move |_| hashed_id));
659
660 let as_of = sink.as_of.clone();
661 let sink_version = sink.version;
662 let (button, errors) = builder.build_fallible(move |_caps| {
663 Box::pin(async move {
664 if !is_active_worker {
665 write_frontier.borrow_mut().clear();
666 return Ok(());
667 }
668
669 fail::fail_point!("kafka_sink_creation_error", |_| Err(
670 ContextCreationError::Other(anyhow::anyhow!("synthetic error"))
671 ));
672
673 let mut write_handle = write_handle.await?;
674
675 let metrics = Arc::new(metrics);
676
677 let (mut producer, resume_upper) = TransactionalProducer::new(
678 sink_id,
679 &connection,
680 &storage_configuration,
681 Arc::clone(&metrics),
682 statistics,
683 sink_version,
684 )
685 .await?;
686
687 let overcompacted =
689 *resume_upper != [Timestamp::minimum()] &&
691 !PartialOrder::less_equal(&as_of, &resume_upper);
693 if overcompacted {
694 let err = format!(
695 "{name}: input compacted past resume upper: as_of {}, resume_upper: {}",
696 as_of.pretty(),
697 resume_upper.pretty()
698 );
699 error!("{err}");
703 return Err(anyhow!("{err}").into());
704 }
705
706 info!(
707 "{name}: as_of: {}, resume upper: {}",
708 as_of.pretty(),
709 resume_upper.pretty()
710 );
711
712 let Some(mut upper) = resume_upper.clone().into_option() else {
716 write_frontier.borrow_mut().clear();
717 return Ok(());
718 };
719
720 let mut deferred_updates = vec![];
721 let mut extra_updates = vec![];
722 let mut transaction_begun = false;
726 while let Some(event) = input.next().await {
727 match event {
728 Event::Data(_cap, batch) => {
729 for (message, time, diff) in batch {
730 match upper.cmp(&time) {
741 Ordering::Less => deferred_updates.push((message, time, diff)),
742 Ordering::Equal => {
743 if !transaction_begun {
744 producer.begin_transaction().await?;
745 transaction_begun = true;
746 }
747 producer.send(&message, time, diff)?;
748 }
749 Ordering::Greater => continue,
750 }
751 }
752 }
753 Event::Progress(progress) => {
754 if !PartialOrder::less_equal(&resume_upper, &progress) {
756 continue;
757 }
758 if !as_of.iter().all(|t| !progress.less_equal(t)) {
778 continue;
779 }
780 if !transaction_begun {
781 producer.begin_transaction().await?;
782 }
783
784 deferred_updates.shrink_to(buffer_min_capacity.get());
789 extra_updates.extend(
790 deferred_updates
791 .extract_if(.., |(_, time, _)| !progress.less_equal(time)),
792 );
793 extra_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
794
795 extra_updates.shrink_to(buffer_min_capacity.get());
797 for (message, time, diff) in extra_updates.drain(..) {
798 producer.send(&message, time, diff)?;
799 }
800
801 debug!("{name}: committing transaction for {}", progress.pretty());
802 producer.commit_transaction(progress.clone()).await?;
803 transaction_begun = false;
804 let mut expect_upper = write_handle.shared_upper();
805 loop {
806 if PartialOrder::less_equal(&progress, &expect_upper) {
807 break;
809 }
810 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
815 match write_handle
816 .compare_and_append(EMPTY, expect_upper, progress.clone())
817 .await
818 .expect("valid usage")
819 {
820 Ok(()) => break,
821 Err(mismatch) => {
822 expect_upper = mismatch.current;
823 }
824 }
825 }
826 write_frontier.borrow_mut().clone_from(&progress);
827 match progress.into_option() {
828 Some(new_upper) => upper = new_upper,
829 None => break,
830 }
831 }
832 }
833 }
834 Ok(())
835 })
836 });
837
838 let statuses = errors.map(|error: Rc<ContextCreationError>| {
839 let hint = match *error {
840 ContextCreationError::KafkaError(KafkaError::Transaction(ref e)) => {
841 if e.is_retriable() && e.code() == RDKafkaErrorCode::OperationTimedOut {
842 let hint = "If you're running a single Kafka broker, ensure that the configs \
843 transaction.state.log.replication.factor, transaction.state.log.min.isr, \
844 and offsets.topic.replication.factor are set to 1 on the broker";
845 Some(hint.to_owned())
846 } else {
847 None
848 }
849 }
850 _ => None,
851 };
852
853 HealthStatusMessage {
854 id: None,
855 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint),
856 namespace: if matches!(*error, ContextCreationError::Ssh(_)) {
857 StatusNamespace::Ssh
858 } else {
859 StatusNamespace::Kafka
860 },
861 }
862 });
863
864 (statuses, button.press_on_drop())
865}
866
867async fn determine_sink_progress(
874 sink_id: GlobalId,
875 connection: &KafkaSinkConnection,
876 storage_configuration: &StorageConfiguration,
877 metrics: Arc<KafkaSinkMetrics>,
878) -> Result<Option<ProgressRecord>, ContextCreationError> {
879 let TimeoutConfig {
886 fetch_metadata_timeout,
887 progress_record_fetch_timeout,
888 ..
889 } = storage_configuration.parameters.kafka_timeout_config;
890
891 let client_id = connection.client_id(
892 storage_configuration.config_set(),
893 &storage_configuration.connection_context,
894 sink_id,
895 );
896 let group_id = connection.progress_group_id(&storage_configuration.connection_context, sink_id);
897 let progress_topic = connection
898 .progress_topic(&storage_configuration.connection_context)
899 .into_owned();
900 let progress_topic_options = &connection.connection.progress_topic_options;
901 let progress_key = ProgressKey::new(sink_id);
902
903 let common_options = btreemap! {
904 "group.id" => group_id,
907 "client.id" => client_id,
909 "enable.auto.commit" => "false".into(),
910 "auto.offset.reset" => "earliest".into(),
911 "enable.partition.eof" => "true".into(),
914 };
915
916 let progress_client_read_committed: BaseConsumer<_> = {
919 let mut opts = common_options.clone();
920 opts.insert("isolation.level", "read_committed".into());
921 let ctx = MzClientContext::default();
922 connection
923 .connection
924 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
925 .await?
926 };
927
928 let progress_client_read_uncommitted: BaseConsumer<_> = {
929 let mut opts = common_options;
930 opts.insert("isolation.level", "read_uncommitted".into());
931 let ctx = MzClientContext::default();
932 connection
933 .connection
934 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
935 .await?
936 };
937
938 let ctx = Arc::clone(progress_client_read_committed.client().context());
939
940 let ensure_topic_config =
942 match &*SINK_ENSURE_TOPIC_CONFIG.get(storage_configuration.config_set()) {
943 "skip" => EnsureTopicConfig::Skip,
944 "check" => EnsureTopicConfig::Check,
945 "alter" => EnsureTopicConfig::Alter,
946 _ => {
947 tracing::warn!(
948 topic = progress_topic,
949 "unexpected value for ensure-topic-config; skipping checks"
950 );
951 EnsureTopicConfig::Skip
952 }
953 };
954 mz_storage_client::sink::ensure_kafka_topic(
955 connection,
956 storage_configuration,
957 &progress_topic,
958 progress_topic_options,
959 ensure_topic_config,
960 )
961 .await
962 .add_context("error registering kafka progress topic for sink")?;
963
964 let parent_token = Arc::new(());
971 let child_token = Arc::downgrade(&parent_token);
972 let task_name = format!("get_latest_ts:{sink_id}");
973 let sink_progress_search = SINK_PROGRESS_SEARCH.get(storage_configuration.config_set());
974 let result = task::spawn_blocking(|| task_name, move || {
975 let progress_topic = progress_topic.as_ref();
976 let partitions = match mz_kafka_util::client::get_partitions(
980 progress_client_read_committed.client(),
981 progress_topic,
982 fetch_metadata_timeout,
983 ) {
984 Ok(partitions) => partitions,
985 Err(GetPartitionsError::TopicDoesNotExist) => {
986 return Ok(None);
989 }
990 e => e.with_context(|| {
991 format!(
992 "Unable to fetch metadata about progress topic {}",
993 progress_topic
994 )
995 })?,
996 };
997 if partitions.len() != 1 {
998 bail!(
999 "Progress topic {} should contain a single partition, but instead contains {} partitions",
1000 progress_topic, partitions.len(),
1001 );
1002 }
1003 let partition = partitions.into_element();
1004
1005 metrics.consumed_progress_records.set(0);
1014
1015 let (lo, hi) = progress_client_read_uncommitted
1041 .fetch_watermarks(progress_topic, partition, fetch_metadata_timeout)
1042 .map_err(|e| {
1043 anyhow!(
1044 "Failed to fetch metadata while reading from progress topic: {}",
1045 e
1046 )
1047 })?;
1048
1049 let mut start_indices = vec![lo];
1055 if sink_progress_search {
1056 let mut lookback = hi.saturating_sub(lo) / 10;
1057 while lookback >= 20_000 {
1058 start_indices.push(hi - lookback);
1059 lookback /= 10;
1060 }
1061 }
1062 for lo in start_indices.into_iter().rev() {
1063 if let Some(found) = progress_search(
1064 &progress_client_read_committed,
1065 progress_record_fetch_timeout,
1066 progress_topic,
1067 partition,
1068 lo,
1069 hi,
1070 progress_key.clone(),
1071 Weak::clone(&child_token),
1072 Arc::clone(&metrics)
1073 )? {
1074 return Ok(Some(found));
1075 }
1076 }
1077 Ok(None)
1078 }).await.check_ssh_status(&ctx);
1079 drop(parent_token);
1081 result
1082}
1083
1084fn progress_search<C: ConsumerContext + 'static>(
1085 progress_client_read_committed: &BaseConsumer<C>,
1086 progress_record_fetch_timeout: Duration,
1087 progress_topic: &str,
1088 partition: i32,
1089 lo: i64,
1090 hi: i64,
1091 progress_key: ProgressKey,
1092 child_token: Weak<()>,
1093 metrics: Arc<KafkaSinkMetrics>,
1094) -> anyhow::Result<Option<ProgressRecord>> {
1095 let mut tps = TopicPartitionList::new();
1097 tps.add_partition(progress_topic, partition);
1098 tps.set_partition_offset(progress_topic, partition, Offset::Offset(lo))?;
1099 progress_client_read_committed
1100 .assign(&tps)
1101 .with_context(|| {
1102 format!(
1103 "Error seeking in progress topic {}:{}",
1104 progress_topic, partition
1105 )
1106 })?;
1107
1108 let get_position = || {
1110 if child_token.strong_count() == 0 {
1111 bail!("operation cancelled");
1112 }
1113 let position = progress_client_read_committed
1114 .position()?
1115 .find_partition(progress_topic, partition)
1116 .ok_or_else(|| {
1117 anyhow!(
1118 "No position info found for progress topic {}",
1119 progress_topic
1120 )
1121 })?
1122 .offset();
1123 let position = match position {
1124 Offset::Offset(position) => position,
1125 Offset::Invalid => lo,
1135 _ => bail!(
1136 "Consumer::position returned offset of wrong type: {:?}",
1137 position
1138 ),
1139 };
1140 let outstanding = u64::try_from(std::cmp::max(0, hi - position)).unwrap();
1142 metrics.outstanding_progress_records.set(outstanding);
1143 Ok(position)
1144 };
1145
1146 info!("fetching latest progress record for {progress_key}, lo/hi: {lo}/{hi}");
1147
1148 let mut last_progress: Option<ProgressRecord> = None;
1172 loop {
1173 let current_position = get_position()?;
1174
1175 if current_position >= hi {
1176 break;
1178 }
1179
1180 let message = match progress_client_read_committed.poll(progress_record_fetch_timeout) {
1181 Some(Ok(message)) => message,
1182 Some(Err(KafkaError::PartitionEOF(_))) => {
1183 continue;
1188 }
1189 Some(Err(e)) => bail!("failed to fetch progress message {e}"),
1190 None => {
1191 bail!(
1192 "timed out while waiting to reach high water mark of non-empty \
1193 topic {progress_topic}:{partition}, lo/hi: {lo}/{hi}, current position: {current_position}"
1194 );
1195 }
1196 };
1197
1198 if message.key() != Some(progress_key.to_bytes()) {
1199 continue;
1201 }
1202
1203 metrics.consumed_progress_records.inc();
1204
1205 let Some(payload) = message.payload() else {
1206 continue;
1207 };
1208 let progress = parse_progress_record(payload)?;
1209
1210 match last_progress {
1211 Some(last_progress)
1212 if !PartialOrder::less_equal(&last_progress.frontier, &progress.frontier) =>
1213 {
1214 bail!(
1215 "upper regressed in topic {progress_topic}:{partition} from {:?} to {:?}",
1216 &last_progress.frontier,
1217 &progress.frontier,
1218 );
1219 }
1220 _ => last_progress = Some(progress),
1221 }
1222 }
1223
1224 Ok(last_progress)
1228}
1229
1230#[derive(Debug, PartialEq, Serialize, Deserialize)]
1237pub struct LegacyProgressRecord {
1238 #[serde(default, deserialize_with = "deserialize_some")]
1241 pub timestamp: Option<Option<Timestamp>>,
1242}
1243
1244fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
1246where
1247 T: Deserialize<'de>,
1248 D: Deserializer<'de>,
1249{
1250 Deserialize::deserialize(deserializer).map(Some)
1251}
1252
1253#[derive(Debug, PartialEq, Serialize, Deserialize)]
1256pub struct ProgressRecord {
1257 #[serde(
1258 deserialize_with = "deserialize_frontier",
1259 serialize_with = "serialize_frontier"
1260 )]
1261 pub frontier: Antichain<Timestamp>,
1262 #[serde(default)]
1263 pub version: u64,
1264}
1265fn serialize_frontier<S>(frontier: &Antichain<Timestamp>, serializer: S) -> Result<S::Ok, S::Error>
1266where
1267 S: Serializer,
1268{
1269 Serialize::serialize(frontier.elements(), serializer)
1270}
1271
1272fn deserialize_frontier<'de, D>(deserializer: D) -> Result<Antichain<Timestamp>, D::Error>
1273where
1274 D: Deserializer<'de>,
1275{
1276 let times: Vec<Timestamp> = Deserialize::deserialize(deserializer)?;
1277 Ok(Antichain::from(times))
1278}
1279
1280fn parse_progress_record(payload: &[u8]) -> Result<ProgressRecord, anyhow::Error> {
1281 Ok(match serde_json::from_slice::<ProgressRecord>(payload) {
1282 Ok(progress) => progress,
1283 Err(_) => match serde_json::from_slice::<LegacyProgressRecord>(payload) {
1285 Ok(LegacyProgressRecord {
1286 timestamp: Some(Some(time)),
1287 }) => ProgressRecord {
1288 frontier: Antichain::from_elem(time.step_forward()),
1289 version: 0,
1290 },
1291 Ok(LegacyProgressRecord {
1292 timestamp: Some(None),
1293 }) => ProgressRecord {
1294 frontier: Antichain::new(),
1295 version: 0,
1296 },
1297 _ => match std::str::from_utf8(payload) {
1298 Ok(payload) => bail!("invalid progress record: {payload}"),
1299 Err(_) => bail!("invalid progress record bytes: {payload:?}"),
1300 },
1301 },
1302 })
1303}
1304
1305async fn fetch_partition_count(
1307 producer: &ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1308 sink_id: GlobalId,
1309 topic_name: &str,
1310) -> Result<u64, anyhow::Error> {
1311 let meta = task::spawn_blocking(|| format!("kafka_sink_fetch_partition_count:{sink_id}"), {
1312 let producer = producer.clone();
1313 move || {
1314 producer
1315 .client()
1316 .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
1317 }
1318 })
1319 .await
1320 .check_ssh_status(producer.context())?;
1321
1322 match meta.topics().iter().find(|t| t.name() == topic_name) {
1323 Some(topic) => {
1324 let partition_count = u64::cast_from(topic.partitions().len());
1325 if partition_count == 0 {
1326 bail!("topic {topic_name} has an impossible partition count of zero");
1327 }
1328 Ok(partition_count)
1329 }
1330 None => bail!("topic {topic_name} does not exist"),
1331 }
1332}
1333
1334async fn fetch_partition_count_loop<F>(
1340 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1341 sink_id: GlobalId,
1342 topic_name: String,
1343 interval: Duration,
1344 update_partition_count: Arc<F>,
1345) where
1346 F: Fn(u64),
1347{
1348 let mut interval = time::interval(interval);
1349 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
1350 loop {
1351 interval.tick().await;
1352 match fetch_partition_count(&producer, sink_id, &topic_name).await {
1353 Ok(pc) => update_partition_count(pc),
1354 Err(e) => {
1355 warn!(%sink_id, "failed updating partition count: {e}");
1356 continue;
1357 }
1358 };
1359 }
1360}
1361
1362fn encode_collection<'scope, T: timely::progress::Timestamp>(
1366 name: String,
1367 input: VecCollection<'scope, T, (Option<Row>, DiffPair<Row>), Diff>,
1368 envelope: SinkEnvelope,
1369 connection: KafkaSinkConnection,
1370 storage_configuration: StorageConfiguration,
1371) -> (
1372 VecCollection<'scope, T, KafkaMessage, Diff>,
1373 StreamVec<'scope, T, HealthStatusMessage>,
1374 PressOnDropButton,
1375) {
1376 let mut builder = AsyncOperatorBuilder::new(name, input.inner.scope());
1377
1378 let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1379 let mut input = builder.new_input_for(input.inner, Pipeline, &output);
1380
1381 let (button, errors) = builder.build_fallible(move |caps| {
1382 Box::pin(async move {
1383 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1384 let key_desc = connection
1385 .key_desc_and_indices
1386 .as_ref()
1387 .map(|(desc, _indices)| desc.clone());
1388 let value_desc = connection.value_desc;
1389
1390 let key_encoder: Option<Box<dyn Encode>> =
1391 match (key_desc, connection.format.key_format) {
1392 (Some(desc), Some(KafkaSinkFormatType::Bytes)) => {
1393 Some(Box::new(BinaryEncoder::new(desc, false)))
1394 }
1395 (Some(desc), Some(KafkaSinkFormatType::Text)) => {
1396 Some(Box::new(TextEncoder::new(desc, false)))
1397 }
1398 (Some(desc), Some(KafkaSinkFormatType::Json)) => {
1399 Some(Box::new(JsonEncoder::new(desc, false)))
1400 }
1401 (Some(desc), Some(KafkaSinkFormatType::Avro {
1402 schema,
1403 compatibility_level,
1404 csr_connection,
1405 })) => {
1406 let ccsr = csr_connection
1411 .connect(&storage_configuration, InTask::Yes)
1412 .await?;
1413
1414 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1415 ccsr,
1416 format!("{}-key", connection.topic),
1417 schema.clone(),
1418 mz_ccsr::SchemaType::Avro,
1419 compatibility_level,
1420 )
1421 .await
1422 .context("error publishing kafka schemas for sink")?;
1423
1424 Some(Box::new(AvroEncoder::new(desc, false, &schema, schema_id)))
1425 }
1426 (None, None) => None,
1427 (desc, format) => {
1428 return Err(anyhow!(
1429 "key_desc and key_format must be both set or both unset, but key_desc: {:?}, key_format: {:?}",
1430 desc,
1431 format
1432 ))
1433 }
1434 };
1435
1436 let debezium = matches!(envelope, SinkEnvelope::Debezium);
1438
1439 let value_encoder: Box<dyn Encode> = match connection.format.value_format {
1440 KafkaSinkFormatType::Bytes => Box::new(BinaryEncoder::new(value_desc, debezium)),
1441 KafkaSinkFormatType::Text => Box::new(TextEncoder::new(value_desc, debezium)),
1442 KafkaSinkFormatType::Json => Box::new(JsonEncoder::new(value_desc, debezium)),
1443 KafkaSinkFormatType::Avro {
1444 schema,
1445 compatibility_level,
1446 csr_connection,
1447 } => {
1448 let ccsr = csr_connection
1453 .connect(&storage_configuration, InTask::Yes)
1454 .await?;
1455
1456 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1457 ccsr,
1458 format!("{}-value", connection.topic),
1459 schema.clone(),
1460 mz_ccsr::SchemaType::Avro,
1461 compatibility_level,
1462 )
1463 .await
1464 .context("error publishing kafka schemas for sink")?;
1465
1466 Box::new(AvroEncoder::new(value_desc, debezium, &schema, schema_id))
1467 }
1468 };
1469
1470 *capset = CapabilitySet::new();
1476
1477 let mut row_buf = Row::default();
1478 let mut datums = DatumVec::new();
1479
1480 while let Some(event) = input.next().await {
1481 if let Event::Data(cap, rows) = event {
1482 for ((key, value), time, diff) in rows {
1483 let mut hash = None;
1484 let mut headers = vec![];
1485 if connection.headers_index.is_some() || connection.partition_by.is_some() {
1486 let row = value
1499 .after
1500 .as_ref()
1501 .or(value.before.as_ref())
1502 .expect("one of before or after must be set");
1503 let row = datums.borrow_with(row);
1504
1505 if let Some(i) = connection.headers_index {
1506 headers = encode_headers(row[i]);
1507 }
1508
1509 if let Some(partition_by) = &connection.partition_by {
1510 hash = Some(evaluate_partition_by(partition_by, &row));
1511 }
1512 }
1513 let (key, hash) = match key {
1514 Some(key) => {
1515 let key_encoder = key_encoder.as_ref().expect("key present");
1516 let key = key_encoder.encode_unchecked(key);
1517 let hash = hash.unwrap_or_else(|| key_encoder.hash(&key));
1518 (Some(key), hash)
1519 }
1520 None => (None, hash.unwrap_or(0))
1521 };
1522 let value = match envelope {
1523 SinkEnvelope::Upsert => value.after,
1524 SinkEnvelope::Debezium => {
1525 dbz_format(&mut row_buf.packer(), value);
1526 Some(row_buf.clone())
1527 }
1528 SinkEnvelope::Append => {
1529 unreachable!("Append envelope is not valid for Kafka sinks")
1530 }
1531 };
1532 let value = value.map(|value| value_encoder.encode_unchecked(value));
1533 let message = KafkaMessage {
1534 hash,
1535 key,
1536 value,
1537 headers,
1538 };
1539 output.give(&cap, (message, time, diff));
1540 }
1541 }
1542 }
1543 Ok::<(), anyhow::Error>(())
1544 })
1545 });
1546
1547 let statuses = errors.map(|error| HealthStatusMessage {
1548 id: None,
1549 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1550 namespace: StatusNamespace::Kafka,
1551 });
1552
1553 (stream.as_collection(), statuses, button.press_on_drop())
1554}
1555
1556fn encode_headers(datum: Datum) -> Vec<KafkaHeader> {
1557 let mut out = vec![];
1558 if datum.is_null() {
1559 return out;
1560 }
1561 for (key, value) in datum.unwrap_map().iter() {
1562 out.push(KafkaHeader {
1563 key: key.into(),
1564 value: match value {
1565 Datum::Null => None,
1566 Datum::String(s) => Some(s.as_bytes().to_vec()),
1567 Datum::Bytes(b) => Some(b.to_vec()),
1568 _ => panic!("encode_headers called with unexpected header value {value:?}"),
1569 },
1570 })
1571 }
1572 out
1573}
1574
1575fn evaluate_partition_by(partition_by: &MirScalarExpr, row: &[Datum]) -> u64 {
1583 let temp_storage = RowArena::new();
1591 match partition_by.eval(row, &temp_storage) {
1592 Ok(Datum::UInt64(u)) => u,
1593 Ok(datum) => {
1594 soft_assert_or_log!(datum.is_null(), "unexpected partition_by result: {datum:?}");
1597 0
1599 }
1600 Err(_) => 0,
1601 }
1602}
1603
1604#[cfg(test)]
1605mod test {
1606 use mz_ore::assert_err;
1607
1608 use super::*;
1609
1610 #[mz_ore::test]
1611 fn progress_record_migration() {
1612 assert_err!(parse_progress_record(b"{}"));
1613
1614 assert_eq!(
1615 parse_progress_record(b"{\"timestamp\":1}").unwrap(),
1616 ProgressRecord {
1617 frontier: Antichain::from_elem(2.into()),
1618 version: 0,
1619 }
1620 );
1621
1622 assert_eq!(
1623 parse_progress_record(b"{\"timestamp\":null}").unwrap(),
1624 ProgressRecord {
1625 frontier: Antichain::new(),
1626 version: 0,
1627 }
1628 );
1629
1630 assert_eq!(
1631 parse_progress_record(b"{\"frontier\":[1]}").unwrap(),
1632 ProgressRecord {
1633 frontier: Antichain::from_elem(1.into()),
1634 version: 0,
1635 }
1636 );
1637
1638 assert_eq!(
1639 parse_progress_record(b"{\"frontier\":[]}").unwrap(),
1640 ProgressRecord {
1641 frontier: Antichain::new(),
1642 version: 0,
1643 }
1644 );
1645
1646 assert_eq!(
1647 parse_progress_record(b"{\"frontier\":[], \"version\": 42}").unwrap(),
1648 ProgressRecord {
1649 frontier: Antichain::new(),
1650 version: 42,
1651 }
1652 );
1653
1654 assert_err!(parse_progress_record(b"{\"frontier\":null}"));
1655 }
1656}