1use std::cell::RefCell;
75use std::cmp::Ordering;
76use std::collections::BTreeMap;
77use std::future::Future;
78use std::rc::Rc;
79use std::sync::atomic::AtomicU64;
80use std::sync::{Arc, Weak};
81use std::time::Duration;
82
83use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
84use crate::metrics::sink::kafka::KafkaSinkMetrics;
85use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
86use crate::statistics::SinkStatistics;
87use crate::storage_state::StorageState;
88use anyhow::{Context, anyhow, bail};
89use differential_dataflow::{AsCollection, Hashable, VecCollection};
90use futures::StreamExt;
91use maplit::btreemap;
92use mz_expr::MirScalarExpr;
93use mz_interchange::avro::AvroEncoder;
94use mz_interchange::encode::Encode;
95use mz_interchange::envelopes::{dbz_format, for_each_diff_pair};
96use mz_interchange::json::JsonEncoder;
97use mz_interchange::text_binary::{BinaryEncoder, TextEncoder};
98use mz_kafka_util::admin::EnsureTopicConfig;
99use mz_kafka_util::client::{
100 DEFAULT_FETCH_METADATA_TIMEOUT, GetPartitionsError, MzClientContext, TimeoutConfig,
101 TunnelingClientContext,
102};
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::CollectionExt;
105use mz_ore::error::ErrorExt;
106use mz_ore::future::InTask;
107use mz_ore::soft_assert_or_log;
108use mz_ore::task::{self, AbortOnDropHandle};
109use mz_persist_client::Diagnostics;
110use mz_persist_client::write::WriteHandle;
111use mz_persist_types::codec_impls::UnitSchema;
112use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
113use mz_storage_client::sink::progress_key::ProgressKey;
114use mz_storage_types::StorageDiff;
115use mz_storage_types::configuration::StorageConfiguration;
116use mz_storage_types::controller::CollectionMetadata;
117use mz_storage_types::dyncfgs::{
118 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS, SINK_ENSURE_TOPIC_CONFIG, SINK_PROGRESS_SEARCH,
119};
120use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
121use mz_storage_types::sinks::{
122 KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, StorageSinkDesc,
123};
124use mz_storage_types::sources::SourceData;
125use mz_timely_util::antichain::AntichainExt;
126use mz_timely_util::builder_async::{
127 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
128};
129use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
130use rdkafka::error::KafkaError;
131use rdkafka::message::{Header, OwnedHeaders, ToBytes};
132use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
133use rdkafka::types::RDKafkaErrorCode;
134use rdkafka::{Message, Offset, Statistics, TopicPartitionList};
135use serde::{Deserialize, Deserializer, Serialize, Serializer};
136use timely::PartialOrder;
137use timely::container::CapacityContainerBuilder;
138use timely::dataflow::StreamVec;
139use timely::dataflow::channels::pact::{Exchange, Pipeline};
140use timely::dataflow::operators::vec::{Map, ToStream};
141use timely::dataflow::operators::{CapabilitySet, Concatenate};
142use timely::progress::{Antichain, Timestamp as _};
143use tokio::sync::watch;
144use tokio::time::{self, MissedTickBehavior};
145use tracing::{debug, error, info, warn};
146
147impl<'scope> SinkRender<'scope> for KafkaSinkConnection {
148 fn get_key_indices(&self) -> Option<&[usize]> {
149 self.key_desc_and_indices
150 .as_ref()
151 .map(|(_desc, indices)| indices.as_slice())
152 }
153
154 fn get_relation_key_indices(&self) -> Option<&[usize]> {
155 self.relation_key_indices.as_deref()
156 }
157
158 fn render_sink(
159 &self,
160 storage_state: &mut StorageState,
161 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
162 sink_id: GlobalId,
163 batches: SinkBatchStream<'scope>,
164 key_is_synthetic: bool,
165 _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
168 ) -> (
169 StreamVec<'scope, Timestamp, HealthStatusMessage>,
170 Vec<PressOnDropButton>,
171 ) {
172 let scope = batches.scope();
173
174 let write_handle = {
175 let persist = Arc::clone(&storage_state.persist_clients);
176 let shard_meta = sink.to_storage_metadata.clone();
177 async move {
178 let client = persist.open(shard_meta.persist_location).await?;
179 let handle = client
180 .open_writer(
181 shard_meta.data_shard,
182 Arc::new(shard_meta.relation_desc),
183 Arc::new(UnitSchema),
184 Diagnostics::from_purpose("sink handle"),
185 )
186 .await?;
187 Ok(handle)
188 }
189 };
190
191 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
192 storage_state
193 .sink_write_frontiers
194 .insert(sink_id, Rc::clone(&write_frontier));
195
196 let (encoded, encode_status, encode_token) = encode_collection(
197 format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
198 batches,
199 sink.envelope,
200 self.clone(),
201 storage_state.storage_configuration.clone(),
202 sink_id,
203 sink.from,
204 key_is_synthetic,
205 );
206
207 let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id);
208 let statistics = storage_state
209 .aggregated_statistics
210 .get_sink(&sink_id)
211 .expect("statistics initialized")
212 .clone();
213
214 let (sink_status, sink_token) = sink_collection(
215 format!("kafka-{sink_id}-sink"),
216 encoded,
217 sink_id,
218 self.clone(),
219 storage_state.storage_configuration.clone(),
220 sink,
221 metrics,
222 statistics,
223 write_handle,
224 write_frontier,
225 );
226
227 let running_status = Some(HealthStatusMessage {
228 id: None,
229 update: HealthStatusUpdate::Running,
230 namespace: StatusNamespace::Kafka,
231 })
232 .to_stream(scope);
233
234 let status = scope.concatenate([running_status, encode_status, sink_status]);
235
236 (status, vec![encode_token, sink_token])
237 }
238}
239
240struct TransactionalProducer {
241 task_name: String,
243 data_topic: String,
245 progress_topic: String,
247 progress_key: ProgressKey,
249 sink_version: u64,
251 partition_count: Arc<AtomicU64>,
253 _partition_count_task: AbortOnDropHandle<()>,
255 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
257 statistics: SinkStatistics,
259 staged_messages: u64,
262 staged_bytes: u64,
265 socket_timeout: Duration,
267 transaction_timeout: Duration,
269}
270
271impl TransactionalProducer {
272 async fn new(
276 sink_id: GlobalId,
277 connection: &KafkaSinkConnection,
278 storage_configuration: &StorageConfiguration,
279 metrics: Arc<KafkaSinkMetrics>,
280 statistics: SinkStatistics,
281 sink_version: u64,
282 ) -> Result<(Self, Antichain<mz_repr::Timestamp>), ContextCreationError> {
283 let client_id = connection.client_id(
284 storage_configuration.config_set(),
285 &storage_configuration.connection_context,
286 sink_id,
287 );
288 let transactional_id =
289 connection.transactional_id(&storage_configuration.connection_context, sink_id);
290
291 let timeout_config = &storage_configuration.parameters.kafka_timeout_config;
292 let mut options = BTreeMap::new();
293 options.insert("enable.idempotence", "true".into());
297 options.insert(
299 "compression.type",
300 connection.compression_type.to_librdkafka_option().into(),
301 );
302 options.insert("queue.buffering.max.kbytes", "2147483647".into());
305 options.insert("queue.buffering.max.messages", "0".into());
308 options.insert("queue.buffering.max.ms", format!("{}", 10));
310 options.insert(
312 "transaction.timeout.ms",
313 format!("{}", timeout_config.transaction_timeout.as_millis()),
314 );
315 options.insert("transactional.id", transactional_id);
317 options.insert("client.id", client_id);
319 options.insert("statistics.interval.ms", "1000".into());
321
322 let ctx = MzClientContext::default();
323
324 let stats_receiver = ctx.subscribe_statistics();
325 let task_name = format!("kafka_sink_metrics_collector:{sink_id}");
326 task::spawn(
327 || &task_name,
328 collect_statistics(stats_receiver, Arc::clone(&metrics)),
329 );
330
331 let producer: ThreadedProducer<_> = connection
332 .connection
333 .create_with_context(storage_configuration, ctx, &options, InTask::Yes)
334 .await?;
335
336 let partition_count = Arc::new(AtomicU64::new(0));
338 let update_partition_count = {
339 let partition_count = Arc::clone(&partition_count);
340 let metrics = Arc::clone(&metrics);
341 Arc::new(move |pc| {
342 partition_count.store(pc, std::sync::atomic::Ordering::SeqCst);
343 metrics.partition_count.set(pc);
344 })
345 };
346
347 let partition_count_task = task::spawn(
350 || format!("kafka_sink_producer_fetch_metadata_loop:{sink_id}"),
351 fetch_partition_count_loop(
352 producer.clone(),
353 sink_id,
354 connection.topic.clone(),
355 connection.topic_metadata_refresh_interval,
356 Arc::clone(&update_partition_count),
357 ),
358 );
359
360 let task_name = format!("kafka_sink_producer:{sink_id}");
361 let progress_key = ProgressKey::new(sink_id);
362
363 let producer = Self {
364 task_name,
365 data_topic: connection.topic.clone(),
366 partition_count,
367 _partition_count_task: partition_count_task.abort_on_drop(),
368 progress_topic: connection
369 .progress_topic(&storage_configuration.connection_context)
370 .into_owned(),
371 progress_key,
372 sink_version,
373 producer,
374 statistics,
375 staged_messages: 0,
376 staged_bytes: 0,
377 socket_timeout: timeout_config.socket_timeout,
378 transaction_timeout: timeout_config.transaction_timeout,
379 };
380
381 let timeout = timeout_config.socket_timeout;
382 producer
383 .spawn_blocking(move |p| p.init_transactions(timeout))
384 .await?;
385
386 let progress = determine_sink_progress(
389 sink_id,
390 connection,
391 storage_configuration,
392 Arc::clone(&metrics),
393 )
394 .await?;
395
396 let resume_upper = match progress {
397 Some(progress) => {
398 if sink_version < progress.version {
399 return Err(ContextCreationError::Other(anyhow!(
400 "Fenced off by newer version of the sink. ours={} theirs={}",
401 sink_version,
402 progress.version
403 )));
404 }
405 progress.frontier
406 }
407 None => {
408 mz_storage_client::sink::ensure_kafka_topic(
409 connection,
410 storage_configuration,
411 &connection.topic,
412 &connection.topic_options,
413 EnsureTopicConfig::Skip,
414 )
415 .await?;
416 Antichain::from_elem(Timestamp::minimum())
417 }
418 };
419
420 let partition_count =
425 fetch_partition_count(&producer.producer, sink_id, &connection.topic).await?;
426 update_partition_count(partition_count);
427
428 Ok((producer, resume_upper))
429 }
430
431 async fn spawn_blocking<F, R>(&self, f: F) -> Result<R, ContextCreationError>
434 where
435 F: FnOnce(
436 ThreadedProducer<TunnelingClientContext<MzClientContext>>,
437 ) -> Result<R, KafkaError>
438 + Send
439 + 'static,
440 R: Send + 'static,
441 {
442 let producer = self.producer.clone();
443 task::spawn_blocking(|| &self.task_name, move || f(producer))
444 .await
445 .check_ssh_status(self.producer.context())
446 }
447
448 async fn begin_transaction(&self) -> Result<(), ContextCreationError> {
449 self.spawn_blocking(|p| p.begin_transaction()).await
450 }
451
452 fn send(
458 &mut self,
459 message: &KafkaMessage,
460 time: Timestamp,
461 diff: Diff,
462 ) -> Result<(), KafkaError> {
463 assert_eq!(diff, Diff::ONE, "invalid sink update");
464
465 let mut headers = OwnedHeaders::new().insert(Header {
466 key: "materialize-timestamp",
467 value: Some(time.to_string().as_bytes()),
468 });
469 for header in &message.headers {
470 if header.key.starts_with("materialize-") {
476 continue;
477 }
478
479 headers = headers.insert(Header {
480 key: header.key.as_str(),
481 value: header.value.as_ref(),
482 });
483 }
484
485 let pc = self
486 .partition_count
487 .load(std::sync::atomic::Ordering::SeqCst);
488 let partition = Some(i32::try_from(message.hash % pc).unwrap());
489
490 let record = BaseRecord {
491 topic: &self.data_topic,
492 key: message.key.as_ref(),
493 payload: message.value.as_ref(),
494 headers: Some(headers),
495 partition,
496 timestamp: None,
497 delivery_opaque: (),
498 };
499 let key_size = message.key.as_ref().map(|k| k.len()).unwrap_or(0);
500 let value_size = message.value.as_ref().map(|k| k.len()).unwrap_or(0);
501 let headers_size = message
502 .headers
503 .iter()
504 .map(|h| h.key.len() + h.value.as_ref().map(|v| v.len()).unwrap_or(0))
505 .sum::<usize>();
506 let record_size = u64::cast_from(key_size + value_size + headers_size);
507 self.statistics.inc_messages_staged_by(1);
508 self.staged_messages += 1;
509 self.statistics.inc_bytes_staged_by(record_size);
510 self.staged_bytes += record_size;
511 self.producer.send(record).map_err(|(e, _)| e)
512 }
513
514 async fn commit_transaction(
517 &mut self,
518 upper: Antichain<Timestamp>,
519 ) -> Result<(), ContextCreationError> {
520 let progress = ProgressRecord {
521 frontier: upper,
522 version: self.sink_version,
523 };
524 let payload = serde_json::to_vec(&progress).expect("infallible");
525 let record = BaseRecord::to(&self.progress_topic)
526 .payload(&payload)
527 .key(&self.progress_key);
528 self.producer.send(record).map_err(|(e, _)| e)?;
529
530 fail::fail_point!("kafka_sink_commit_transaction");
531
532 let timeout = self.transaction_timeout;
533 match self
534 .spawn_blocking(move |p| p.commit_transaction(timeout))
535 .await
536 {
537 Ok(()) => {
538 self.statistics
539 .inc_messages_committed_by(self.staged_messages);
540 self.statistics.inc_bytes_committed_by(self.staged_bytes);
541 self.staged_messages = 0;
542 self.staged_bytes = 0;
543 Ok(())
544 }
545 Err(ContextCreationError::KafkaError(KafkaError::Transaction(err))) => {
546 if err.txn_requires_abort() {
552 let timeout = self.socket_timeout;
553 self.spawn_blocking(move |p| p.abort_transaction(timeout))
554 .await?;
555 }
556 Err(ContextCreationError::KafkaError(KafkaError::Transaction(
557 err,
558 )))
559 }
560 Err(err) => Err(err),
561 }
562 }
563}
564
565async fn collect_statistics(
567 mut receiver: watch::Receiver<Statistics>,
568 metrics: Arc<KafkaSinkMetrics>,
569) {
570 let mut outbuf_cnt: i64 = 0;
571 let mut outbuf_msg_cnt: i64 = 0;
572 let mut waitresp_cnt: i64 = 0;
573 let mut waitresp_msg_cnt: i64 = 0;
574 let mut txerrs: u64 = 0;
575 let mut txretries: u64 = 0;
576 let mut req_timeouts: u64 = 0;
577 let mut connects: i64 = 0;
578 let mut disconnects: i64 = 0;
579 while receiver.changed().await.is_ok() {
580 let stats = receiver.borrow();
581 for broker in stats.brokers.values() {
582 outbuf_cnt += broker.outbuf_cnt;
583 outbuf_msg_cnt += broker.outbuf_msg_cnt;
584 waitresp_cnt += broker.waitresp_cnt;
585 waitresp_msg_cnt += broker.waitresp_msg_cnt;
586 txerrs += broker.txerrs;
587 txretries += broker.txretries;
588 req_timeouts += broker.req_timeouts;
589 connects += broker.connects.unwrap_or(0);
590 disconnects += broker.disconnects.unwrap_or(0);
591 }
592 metrics.rdkafka_msg_cnt.set(stats.msg_cnt);
593 metrics.rdkafka_msg_size.set(stats.msg_size);
594 metrics.rdkafka_txmsgs.set(stats.txmsgs);
595 metrics.rdkafka_txmsg_bytes.set(stats.txmsg_bytes);
596 metrics.rdkafka_tx.set(stats.tx);
597 metrics.rdkafka_tx_bytes.set(stats.tx_bytes);
598 metrics.rdkafka_outbuf_cnt.set(outbuf_cnt);
599 metrics.rdkafka_outbuf_msg_cnt.set(outbuf_msg_cnt);
600 metrics.rdkafka_waitresp_cnt.set(waitresp_cnt);
601 metrics.rdkafka_waitresp_msg_cnt.set(waitresp_msg_cnt);
602 metrics.rdkafka_txerrs.set(txerrs);
603 metrics.rdkafka_txretries.set(txretries);
604 metrics.rdkafka_req_timeouts.set(req_timeouts);
605 metrics.rdkafka_connects.set(connects);
606 metrics.rdkafka_disconnects.set(disconnects);
607 }
608}
609
610#[derive(Debug, Clone, Serialize, Deserialize)]
612struct KafkaMessage {
613 hash: u64,
615 key: Option<Vec<u8>>,
617 value: Option<Vec<u8>>,
619 headers: Vec<KafkaHeader>,
621}
622
623#[derive(Debug, Clone, Serialize, Deserialize)]
625struct KafkaHeader {
626 key: String,
628 value: Option<Vec<u8>>,
630}
631
632fn sink_collection<'scope>(
638 name: String,
639 input: VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
640 sink_id: GlobalId,
641 connection: KafkaSinkConnection,
642 storage_configuration: StorageConfiguration,
643 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
644 metrics: KafkaSinkMetrics,
645 statistics: SinkStatistics,
646 write_handle: impl Future<
647 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
648 > + 'static,
649 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
650) -> (
651 StreamVec<'scope, Timestamp, HealthStatusMessage>,
652 PressOnDropButton,
653) {
654 let scope = input.scope();
655 let mut builder = AsyncOperatorBuilder::new(name.clone(), input.inner.scope());
656
657 let hashed_id = sink_id.hashed();
659 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
660 let buffer_min_capacity =
661 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS.handle(storage_configuration.config_set());
662
663 let mut input = builder.new_disconnected_input(input.inner, Exchange::new(move |_| hashed_id));
664
665 let as_of = sink.as_of.clone();
666 let sink_version = sink.version;
667 let (button, errors) = builder.build_fallible(move |_caps| {
668 Box::pin(async move {
669 if !is_active_worker {
670 write_frontier.borrow_mut().clear();
671 return Ok(());
672 }
673
674 fail::fail_point!("kafka_sink_creation_error", |_| Err(
675 ContextCreationError::Other(anyhow::anyhow!("synthetic error"))
676 ));
677
678 let mut write_handle = write_handle.await?;
679
680 let metrics = Arc::new(metrics);
681
682 let (mut producer, resume_upper) = TransactionalProducer::new(
683 sink_id,
684 &connection,
685 &storage_configuration,
686 Arc::clone(&metrics),
687 statistics,
688 sink_version,
689 )
690 .await?;
691
692 let overcompacted =
694 *resume_upper != [Timestamp::minimum()] &&
696 !PartialOrder::less_equal(&as_of, &resume_upper);
698 if overcompacted {
699 let err = format!(
700 "{name}: input compacted past resume upper: as_of {}, resume_upper: {}",
701 as_of.pretty(),
702 resume_upper.pretty()
703 );
704 error!("{err}");
708 return Err(anyhow!("{err}").into());
709 }
710
711 info!(
712 "{name}: as_of: {}, resume upper: {}",
713 as_of.pretty(),
714 resume_upper.pretty()
715 );
716
717 let Some(mut upper) = resume_upper.clone().into_option() else {
721 write_frontier.borrow_mut().clear();
722 return Ok(());
723 };
724
725 let mut deferred_updates = vec![];
726 let mut extra_updates = vec![];
727 let mut transaction_begun = false;
731 while let Some(event) = input.next().await {
732 match event {
733 Event::Data(_cap, batch) => {
734 for (message, time, diff) in batch {
735 match upper.cmp(&time) {
746 Ordering::Less => deferred_updates.push((message, time, diff)),
747 Ordering::Equal => {
748 if !transaction_begun {
749 producer.begin_transaction().await?;
750 transaction_begun = true;
751 }
752 producer.send(&message, time, diff)?;
753 }
754 Ordering::Greater => continue,
755 }
756 }
757 }
758 Event::Progress(progress) => {
759 if !PartialOrder::less_equal(&resume_upper, &progress) {
761 continue;
762 }
763 if !as_of.iter().all(|t| !progress.less_equal(t)) {
783 continue;
784 }
785 if !transaction_begun {
786 producer.begin_transaction().await?;
787 }
788
789 extra_updates.extend(
790 deferred_updates
791 .extract_if(.., |(_, time, _)| !progress.less_equal(time)),
792 );
793 deferred_updates.shrink_to(buffer_min_capacity.get());
797 extra_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
798
799 for (message, time, diff) in extra_updates.drain(..) {
800 producer.send(&message, time, diff)?;
801 }
802 extra_updates.shrink_to(buffer_min_capacity.get());
803
804 debug!("{name}: committing transaction for {}", progress.pretty());
805 producer.commit_transaction(progress.clone()).await?;
806 transaction_begun = false;
807 let mut expect_upper = write_handle.shared_upper();
808 loop {
809 if PartialOrder::less_equal(&progress, &expect_upper) {
810 break;
812 }
813 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
818 match write_handle
819 .compare_and_append(EMPTY, expect_upper, progress.clone())
820 .await
821 .expect("valid usage")
822 {
823 Ok(()) => break,
824 Err(mismatch) => {
825 expect_upper = mismatch.current;
826 }
827 }
828 }
829 write_frontier.borrow_mut().clone_from(&progress);
830 match progress.into_option() {
831 Some(new_upper) => upper = new_upper,
832 None => break,
833 }
834 }
835 }
836 }
837 Ok(())
838 })
839 });
840
841 let statuses = errors.map(|error: Rc<ContextCreationError>| {
842 let hint = match *error {
843 ContextCreationError::KafkaError(KafkaError::Transaction(ref e)) => {
844 if e.is_retriable() && e.code() == RDKafkaErrorCode::OperationTimedOut {
845 let hint = "If you're running a single Kafka broker, ensure that the configs \
846 transaction.state.log.replication.factor, transaction.state.log.min.isr, \
847 and offsets.topic.replication.factor are set to 1 on the broker";
848 Some(hint.to_owned())
849 } else {
850 None
851 }
852 }
853 _ => None,
854 };
855
856 HealthStatusMessage {
857 id: None,
858 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint),
859 namespace: if matches!(*error, ContextCreationError::Ssh(_)) {
860 StatusNamespace::Ssh
861 } else {
862 StatusNamespace::Kafka
863 },
864 }
865 });
866
867 (statuses, button.press_on_drop())
868}
869
870async fn determine_sink_progress(
877 sink_id: GlobalId,
878 connection: &KafkaSinkConnection,
879 storage_configuration: &StorageConfiguration,
880 metrics: Arc<KafkaSinkMetrics>,
881) -> Result<Option<ProgressRecord>, ContextCreationError> {
882 let TimeoutConfig {
889 fetch_metadata_timeout,
890 progress_record_fetch_timeout,
891 ..
892 } = storage_configuration.parameters.kafka_timeout_config;
893
894 let client_id = connection.client_id(
895 storage_configuration.config_set(),
896 &storage_configuration.connection_context,
897 sink_id,
898 );
899 let group_id = connection.progress_group_id(&storage_configuration.connection_context, sink_id);
900 let progress_topic = connection
901 .progress_topic(&storage_configuration.connection_context)
902 .into_owned();
903 let progress_topic_options = &connection.connection.progress_topic_options;
904 let progress_key = ProgressKey::new(sink_id);
905
906 let common_options = btreemap! {
907 "group.id" => group_id,
910 "client.id" => client_id,
912 "enable.auto.commit" => "false".into(),
913 "auto.offset.reset" => "earliest".into(),
914 "enable.partition.eof" => "true".into(),
917 };
918
919 let progress_client_read_committed: BaseConsumer<_> = {
922 let mut opts = common_options.clone();
923 opts.insert("isolation.level", "read_committed".into());
924 let ctx = MzClientContext::default();
925 connection
926 .connection
927 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
928 .await?
929 };
930
931 let progress_client_read_uncommitted: BaseConsumer<_> = {
932 let mut opts = common_options;
933 opts.insert("isolation.level", "read_uncommitted".into());
934 let ctx = MzClientContext::default();
935 connection
936 .connection
937 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
938 .await?
939 };
940
941 let ctx = Arc::clone(progress_client_read_committed.client().context());
942
943 let ensure_topic_config =
945 match &*SINK_ENSURE_TOPIC_CONFIG.get(storage_configuration.config_set()) {
946 "skip" => EnsureTopicConfig::Skip,
947 "check" => EnsureTopicConfig::Check,
948 "alter" => EnsureTopicConfig::Alter,
949 _ => {
950 tracing::warn!(
951 topic = progress_topic,
952 "unexpected value for ensure-topic-config; skipping checks"
953 );
954 EnsureTopicConfig::Skip
955 }
956 };
957 mz_storage_client::sink::ensure_kafka_topic(
958 connection,
959 storage_configuration,
960 &progress_topic,
961 progress_topic_options,
962 ensure_topic_config,
963 )
964 .await
965 .add_context("error registering kafka progress topic for sink")?;
966
967 let parent_token = Arc::new(());
974 let child_token = Arc::downgrade(&parent_token);
975 let task_name = format!("get_latest_ts:{sink_id}");
976 let sink_progress_search = SINK_PROGRESS_SEARCH.get(storage_configuration.config_set());
977 let result = task::spawn_blocking(|| task_name, move || {
978 let progress_topic = progress_topic.as_ref();
979 let partitions = match mz_kafka_util::client::get_partitions(
983 progress_client_read_committed.client(),
984 progress_topic,
985 fetch_metadata_timeout,
986 ) {
987 Ok(partitions) => partitions,
988 Err(GetPartitionsError::TopicDoesNotExist) => {
989 return Ok(None);
992 }
993 e => e.with_context(|| {
994 format!(
995 "Unable to fetch metadata about progress topic {}",
996 progress_topic
997 )
998 })?,
999 };
1000 if partitions.len() != 1 {
1001 bail!(
1002 "Progress topic {} should contain a single partition, but instead contains {} partitions",
1003 progress_topic, partitions.len(),
1004 );
1005 }
1006 let partition = partitions.into_element();
1007
1008 metrics.consumed_progress_records.set(0);
1017
1018 let (lo, hi) = progress_client_read_uncommitted
1044 .fetch_watermarks(progress_topic, partition, fetch_metadata_timeout)
1045 .map_err(|e| {
1046 anyhow!(
1047 "Failed to fetch metadata while reading from progress topic: {}",
1048 e
1049 )
1050 })?;
1051
1052 let mut start_indices = vec![lo];
1058 if sink_progress_search {
1059 let mut lookback = hi.saturating_sub(lo) / 10;
1060 while lookback >= 20_000 {
1061 start_indices.push(hi - lookback);
1062 lookback /= 10;
1063 }
1064 }
1065 for lo in start_indices.into_iter().rev() {
1066 if let Some(found) = progress_search(
1067 &progress_client_read_committed,
1068 progress_record_fetch_timeout,
1069 progress_topic,
1070 partition,
1071 lo,
1072 hi,
1073 progress_key.clone(),
1074 Weak::clone(&child_token),
1075 Arc::clone(&metrics)
1076 )? {
1077 return Ok(Some(found));
1078 }
1079 }
1080 Ok(None)
1081 }).await.check_ssh_status(&ctx);
1082 drop(parent_token);
1084 result
1085}
1086
1087fn progress_search<C: ConsumerContext + 'static>(
1088 progress_client_read_committed: &BaseConsumer<C>,
1089 progress_record_fetch_timeout: Duration,
1090 progress_topic: &str,
1091 partition: i32,
1092 lo: i64,
1093 hi: i64,
1094 progress_key: ProgressKey,
1095 child_token: Weak<()>,
1096 metrics: Arc<KafkaSinkMetrics>,
1097) -> anyhow::Result<Option<ProgressRecord>> {
1098 let mut tps = TopicPartitionList::new();
1100 tps.add_partition(progress_topic, partition);
1101 tps.set_partition_offset(progress_topic, partition, Offset::Offset(lo))?;
1102 progress_client_read_committed
1103 .assign(&tps)
1104 .with_context(|| {
1105 format!(
1106 "Error seeking in progress topic {}:{}",
1107 progress_topic, partition
1108 )
1109 })?;
1110
1111 let get_position = || {
1113 if child_token.strong_count() == 0 {
1114 bail!("operation cancelled");
1115 }
1116 let position = progress_client_read_committed
1117 .position()?
1118 .find_partition(progress_topic, partition)
1119 .ok_or_else(|| {
1120 anyhow!(
1121 "No position info found for progress topic {}",
1122 progress_topic
1123 )
1124 })?
1125 .offset();
1126 let position = match position {
1127 Offset::Offset(position) => position,
1128 Offset::Invalid => lo,
1138 _ => bail!(
1139 "Consumer::position returned offset of wrong type: {:?}",
1140 position
1141 ),
1142 };
1143 let outstanding = u64::try_from(std::cmp::max(0, hi - position)).unwrap();
1145 metrics.outstanding_progress_records.set(outstanding);
1146 Ok(position)
1147 };
1148
1149 info!("fetching latest progress record for {progress_key}, lo/hi: {lo}/{hi}");
1150
1151 let mut last_progress: Option<ProgressRecord> = None;
1175 loop {
1176 let current_position = get_position()?;
1177
1178 if current_position >= hi {
1179 break;
1181 }
1182
1183 let message = match progress_client_read_committed.poll(progress_record_fetch_timeout) {
1184 Some(Ok(message)) => message,
1185 Some(Err(KafkaError::PartitionEOF(_))) => {
1186 continue;
1191 }
1192 Some(Err(e)) => bail!("failed to fetch progress message {e}"),
1193 None => {
1194 bail!(
1195 "timed out while waiting to reach high water mark of non-empty \
1196 topic {progress_topic}:{partition}, lo/hi: {lo}/{hi}, current position: {current_position}"
1197 );
1198 }
1199 };
1200
1201 if message.key() != Some(progress_key.to_bytes()) {
1202 continue;
1204 }
1205
1206 metrics.consumed_progress_records.inc();
1207
1208 let Some(payload) = message.payload() else {
1209 continue;
1210 };
1211 let progress = parse_progress_record(payload)?;
1212
1213 match last_progress {
1214 Some(last_progress)
1215 if !PartialOrder::less_equal(&last_progress.frontier, &progress.frontier) =>
1216 {
1217 bail!(
1218 "upper regressed in topic {progress_topic}:{partition} from {:?} to {:?}",
1219 &last_progress.frontier,
1220 &progress.frontier,
1221 );
1222 }
1223 _ => last_progress = Some(progress),
1224 }
1225 }
1226
1227 Ok(last_progress)
1231}
1232
1233#[derive(Debug, PartialEq, Serialize, Deserialize)]
1240pub struct LegacyProgressRecord {
1241 #[serde(default, deserialize_with = "deserialize_some")]
1244 pub timestamp: Option<Option<Timestamp>>,
1245}
1246
1247fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
1249where
1250 T: Deserialize<'de>,
1251 D: Deserializer<'de>,
1252{
1253 Deserialize::deserialize(deserializer).map(Some)
1254}
1255
1256#[derive(Debug, PartialEq, Serialize, Deserialize)]
1259pub struct ProgressRecord {
1260 #[serde(
1261 deserialize_with = "deserialize_frontier",
1262 serialize_with = "serialize_frontier"
1263 )]
1264 pub frontier: Antichain<Timestamp>,
1265 #[serde(default)]
1266 pub version: u64,
1267}
1268fn serialize_frontier<S>(frontier: &Antichain<Timestamp>, serializer: S) -> Result<S::Ok, S::Error>
1269where
1270 S: Serializer,
1271{
1272 Serialize::serialize(frontier.elements(), serializer)
1273}
1274
1275fn deserialize_frontier<'de, D>(deserializer: D) -> Result<Antichain<Timestamp>, D::Error>
1276where
1277 D: Deserializer<'de>,
1278{
1279 let times: Vec<Timestamp> = Deserialize::deserialize(deserializer)?;
1280 Ok(Antichain::from(times))
1281}
1282
1283fn parse_progress_record(payload: &[u8]) -> Result<ProgressRecord, anyhow::Error> {
1284 Ok(match serde_json::from_slice::<ProgressRecord>(payload) {
1285 Ok(progress) => progress,
1286 Err(_) => match serde_json::from_slice::<LegacyProgressRecord>(payload) {
1288 Ok(LegacyProgressRecord {
1289 timestamp: Some(Some(time)),
1290 }) => ProgressRecord {
1291 frontier: Antichain::from_elem(time.step_forward()),
1292 version: 0,
1293 },
1294 Ok(LegacyProgressRecord {
1295 timestamp: Some(None),
1296 }) => ProgressRecord {
1297 frontier: Antichain::new(),
1298 version: 0,
1299 },
1300 _ => match std::str::from_utf8(payload) {
1301 Ok(payload) => bail!("invalid progress record: {payload}"),
1302 Err(_) => bail!("invalid progress record bytes: {payload:?}"),
1303 },
1304 },
1305 })
1306}
1307
1308async fn fetch_partition_count(
1310 producer: &ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1311 sink_id: GlobalId,
1312 topic_name: &str,
1313) -> Result<u64, anyhow::Error> {
1314 let meta = task::spawn_blocking(|| format!("kafka_sink_fetch_partition_count:{sink_id}"), {
1315 let producer = producer.clone();
1316 move || {
1317 producer
1318 .client()
1319 .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
1320 }
1321 })
1322 .await
1323 .check_ssh_status(producer.context())?;
1324
1325 match meta.topics().iter().find(|t| t.name() == topic_name) {
1326 Some(topic) => {
1327 let partition_count = u64::cast_from(topic.partitions().len());
1328 if partition_count == 0 {
1329 bail!("topic {topic_name} has an impossible partition count of zero");
1330 }
1331 Ok(partition_count)
1332 }
1333 None => bail!("topic {topic_name} does not exist"),
1334 }
1335}
1336
1337async fn fetch_partition_count_loop<F>(
1343 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1344 sink_id: GlobalId,
1345 topic_name: String,
1346 interval: Duration,
1347 update_partition_count: Arc<F>,
1348) where
1349 F: Fn(u64),
1350{
1351 let mut interval = time::interval(interval);
1352 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
1353 loop {
1354 interval.tick().await;
1355 match fetch_partition_count(&producer, sink_id, &topic_name).await {
1356 Ok(pc) => update_partition_count(pc),
1357 Err(e) => {
1358 warn!(%sink_id, "failed updating partition count: {e}");
1359 continue;
1360 }
1361 };
1362 }
1363}
1364
1365fn encode_collection<'scope>(
1371 name: String,
1372 batches: SinkBatchStream<'scope>,
1373 envelope: SinkEnvelope,
1374 connection: KafkaSinkConnection,
1375 storage_configuration: StorageConfiguration,
1376 sink_id: GlobalId,
1377 from_id: GlobalId,
1378 key_is_synthetic: bool,
1379) -> (
1380 VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
1381 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1382 PressOnDropButton,
1383) {
1384 let mut builder = AsyncOperatorBuilder::new(name, batches.scope());
1385
1386 let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1387 let mut input = builder.new_input_for(batches, Pipeline, &output);
1388
1389 let (button, errors) = builder.build_fallible(move |caps| {
1390 Box::pin(async move {
1391 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1392 let key_desc = connection
1393 .key_desc_and_indices
1394 .as_ref()
1395 .map(|(desc, _indices)| desc.clone());
1396 let value_desc = connection.value_desc;
1397
1398 let key_encoder: Option<Box<dyn Encode>> =
1399 match (key_desc, connection.format.key_format) {
1400 (Some(desc), Some(KafkaSinkFormatType::Bytes)) => {
1401 Some(Box::new(BinaryEncoder::new(desc, false)))
1402 }
1403 (Some(desc), Some(KafkaSinkFormatType::Text)) => {
1404 Some(Box::new(TextEncoder::new(desc, false)))
1405 }
1406 (Some(desc), Some(KafkaSinkFormatType::Json)) => {
1407 Some(Box::new(JsonEncoder::new(desc, false)))
1408 }
1409 (Some(desc), Some(KafkaSinkFormatType::Avro {
1410 schema,
1411 compatibility_level,
1412 csr_connection,
1413 })) => {
1414 let ccsr = csr_connection
1419 .connect(&storage_configuration, InTask::Yes)
1420 .await?;
1421
1422 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1423 ccsr,
1424 format!("{}-key", connection.topic),
1425 schema.clone(),
1426 mz_ccsr::SchemaType::Avro,
1427 compatibility_level,
1428 )
1429 .await
1430 .context("error publishing kafka schemas for sink")?;
1431
1432 Some(Box::new(AvroEncoder::new(desc, false, &schema, schema_id)))
1433 }
1434 (None, None) => None,
1435 (desc, format) => {
1436 return Err(anyhow!(
1437 "key_desc and key_format must be both set or both unset, but key_desc: {:?}, key_format: {:?}",
1438 desc,
1439 format
1440 ))
1441 }
1442 };
1443
1444 let debezium = matches!(envelope, SinkEnvelope::Debezium);
1446
1447 let value_encoder: Box<dyn Encode> = match connection.format.value_format {
1448 KafkaSinkFormatType::Bytes => Box::new(BinaryEncoder::new(value_desc, debezium)),
1449 KafkaSinkFormatType::Text => Box::new(TextEncoder::new(value_desc, debezium)),
1450 KafkaSinkFormatType::Json => Box::new(JsonEncoder::new(value_desc, debezium)),
1451 KafkaSinkFormatType::Avro {
1452 schema,
1453 compatibility_level,
1454 csr_connection,
1455 } => {
1456 let ccsr = csr_connection
1461 .connect(&storage_configuration, InTask::Yes)
1462 .await?;
1463
1464 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1465 ccsr,
1466 format!("{}-value", connection.topic),
1467 schema.clone(),
1468 mz_ccsr::SchemaType::Avro,
1469 compatibility_level,
1470 )
1471 .await
1472 .context("error publishing kafka schemas for sink")?;
1473
1474 Box::new(AvroEncoder::new(value_desc, debezium, &schema, schema_id))
1475 }
1476 };
1477
1478 *capset = CapabilitySet::new();
1484
1485 let mut row_buf = Row::default();
1486 let mut datums = DatumVec::new();
1487 let mut pk_warner =
1488 (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id));
1489
1490 while let Some(event) = input.next().await {
1491 if let Event::Data(cap, mut batches) = event {
1492 for batch in batches.drain(..) {
1493 for_each_diff_pair(&batch, |key, time, value| {
1494 if let Some(warner) = pk_warner.as_mut() {
1495 warner.observe(key, time);
1496 }
1497 let key_for_message = if key_encoder.is_some() { key } else { &None };
1501
1502 let mut hash = None;
1503 let mut headers = vec![];
1504 if connection.headers_index.is_some()
1505 || connection.partition_by.is_some()
1506 {
1507 let row = value
1521 .after
1522 .as_ref()
1523 .or(value.before.as_ref())
1524 .expect("one of before or after must be set");
1525 let row = datums.borrow_with(row);
1526
1527 if let Some(i) = connection.headers_index {
1528 headers = encode_headers(row[i]);
1529 }
1530
1531 if let Some(partition_by) = &connection.partition_by {
1532 hash = Some(evaluate_partition_by(partition_by, &row));
1533 }
1534 }
1535 let (encoded_key, hash) = match key_for_message {
1536 Some(key) => {
1537 let key_encoder =
1538 key_encoder.as_ref().expect("key present");
1539 let encoded = key_encoder.encode_unchecked(key.clone());
1540 let hash =
1541 hash.unwrap_or_else(|| key_encoder.hash(&encoded));
1542 (Some(encoded), hash)
1543 }
1544 None => (None, hash.unwrap_or(0)),
1545 };
1546 let value = match envelope {
1547 SinkEnvelope::Upsert => value.after,
1548 SinkEnvelope::Debezium => {
1549 dbz_format(&mut row_buf.packer(), value);
1550 Some(row_buf.clone())
1551 }
1552 SinkEnvelope::Append => {
1553 unreachable!("Append envelope is not valid for Kafka sinks")
1554 }
1555 };
1556 let value = value.map(|value| value_encoder.encode_unchecked(value));
1557 let message = KafkaMessage {
1558 hash,
1559 key: encoded_key,
1560 value,
1561 headers,
1562 };
1563 output.give(&cap, (message, time, Diff::ONE));
1564 });
1565 if let Some(warner) = pk_warner.as_mut() {
1569 warner.flush();
1570 }
1571 }
1572 }
1573 }
1574
1575 Ok::<(), anyhow::Error>(())
1576 })
1577 });
1578
1579 let statuses = errors.map(|error| HealthStatusMessage {
1580 id: None,
1581 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1582 namespace: StatusNamespace::Kafka,
1583 });
1584
1585 (stream.as_collection(), statuses, button.press_on_drop())
1586}
1587
1588fn encode_headers(datum: Datum) -> Vec<KafkaHeader> {
1589 let mut out = vec![];
1590 if datum.is_null() {
1591 return out;
1592 }
1593 for (key, value) in datum.unwrap_map().iter() {
1594 out.push(KafkaHeader {
1595 key: key.into(),
1596 value: match value {
1597 Datum::Null => None,
1598 Datum::String(s) => Some(s.as_bytes().to_vec()),
1599 Datum::Bytes(b) => Some(b.to_vec()),
1600 _ => panic!("encode_headers called with unexpected header value {value:?}"),
1601 },
1602 })
1603 }
1604 out
1605}
1606
1607fn evaluate_partition_by(partition_by: &MirScalarExpr, row: &[Datum]) -> u64 {
1615 let temp_storage = RowArena::new();
1623 match partition_by.eval(row, &temp_storage) {
1624 Ok(Datum::UInt64(u)) => u,
1625 Ok(datum) => {
1626 soft_assert_or_log!(datum.is_null(), "unexpected partition_by result: {datum:?}");
1629 0
1631 }
1632 Err(_) => 0,
1633 }
1634}
1635
1636#[cfg(test)]
1637mod test {
1638 use mz_ore::assert_err;
1639
1640 use super::*;
1641
1642 #[mz_ore::test]
1643 fn progress_record_migration() {
1644 assert_err!(parse_progress_record(b"{}"));
1645
1646 assert_eq!(
1647 parse_progress_record(b"{\"timestamp\":1}").unwrap(),
1648 ProgressRecord {
1649 frontier: Antichain::from_elem(2.into()),
1650 version: 0,
1651 }
1652 );
1653
1654 assert_eq!(
1655 parse_progress_record(b"{\"timestamp\":null}").unwrap(),
1656 ProgressRecord {
1657 frontier: Antichain::new(),
1658 version: 0,
1659 }
1660 );
1661
1662 assert_eq!(
1663 parse_progress_record(b"{\"frontier\":[1]}").unwrap(),
1664 ProgressRecord {
1665 frontier: Antichain::from_elem(1.into()),
1666 version: 0,
1667 }
1668 );
1669
1670 assert_eq!(
1671 parse_progress_record(b"{\"frontier\":[]}").unwrap(),
1672 ProgressRecord {
1673 frontier: Antichain::new(),
1674 version: 0,
1675 }
1676 );
1677
1678 assert_eq!(
1679 parse_progress_record(b"{\"frontier\":[], \"version\": 42}").unwrap(),
1680 ProgressRecord {
1681 frontier: Antichain::new(),
1682 version: 42,
1683 }
1684 );
1685
1686 assert_err!(parse_progress_record(b"{\"frontier\":null}"));
1687 }
1688}