1use std::cell::RefCell;
74use std::cmp::Ordering;
75use std::collections::BTreeMap;
76use std::future::Future;
77use std::rc::Rc;
78use std::sync::atomic::AtomicU64;
79use std::sync::{Arc, Weak};
80use std::time::Duration;
81
82use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
83use crate::metrics::sink::kafka::KafkaSinkMetrics;
84use crate::render::sinks::SinkRender;
85use crate::statistics::SinkStatistics;
86use crate::storage_state::StorageState;
87use anyhow::{Context, anyhow, bail};
88use differential_dataflow::{AsCollection, Collection, Hashable};
89use futures::StreamExt;
90use maplit::btreemap;
91use mz_expr::MirScalarExpr;
92use mz_interchange::avro::{AvroEncoder, DiffPair};
93use mz_interchange::encode::Encode;
94use mz_interchange::envelopes::dbz_format;
95use mz_interchange::json::JsonEncoder;
96use mz_interchange::text_binary::{BinaryEncoder, TextEncoder};
97use mz_kafka_util::admin::EnsureTopicConfig;
98use mz_kafka_util::client::{
99 DEFAULT_FETCH_METADATA_TIMEOUT, GetPartitionsError, MzClientContext, TimeoutConfig,
100 TunnelingClientContext,
101};
102use mz_ore::cast::CastFrom;
103use mz_ore::collections::CollectionExt;
104use mz_ore::error::ErrorExt;
105use mz_ore::future::InTask;
106use mz_ore::task::{self, AbortOnDropHandle};
107use mz_ore::vec::VecExt;
108use mz_persist_client::Diagnostics;
109use mz_persist_client::write::WriteHandle;
110use mz_persist_types::codec_impls::UnitSchema;
111use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
112use mz_storage_client::sink::progress_key::ProgressKey;
113use mz_storage_types::StorageDiff;
114use mz_storage_types::configuration::StorageConfiguration;
115use mz_storage_types::controller::CollectionMetadata;
116use mz_storage_types::dyncfgs::{
117 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS, SINK_ENSURE_TOPIC_CONFIG, SINK_PROGRESS_SEARCH,
118};
119use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
120use mz_storage_types::sinks::{
121 KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, StorageSinkDesc,
122};
123use mz_storage_types::sources::SourceData;
124use mz_timely_util::antichain::AntichainExt;
125use mz_timely_util::builder_async::{
126 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
127};
128use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
129use rdkafka::error::KafkaError;
130use rdkafka::message::{Header, OwnedHeaders, ToBytes};
131use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
132use rdkafka::types::RDKafkaErrorCode;
133use rdkafka::{Message, Offset, Statistics, TopicPartitionList};
134use serde::{Deserialize, Deserializer, Serialize, Serializer};
135use timely::PartialOrder;
136use timely::dataflow::channels::pact::{Exchange, Pipeline};
137use timely::dataflow::operators::{CapabilitySet, Concatenate, Map, ToStream};
138use timely::dataflow::{Scope, Stream};
139use timely::progress::{Antichain, Timestamp as _};
140use tokio::sync::watch;
141use tokio::time::{self, MissedTickBehavior};
142use tracing::{debug, error, info, warn};
143
144impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for KafkaSinkConnection {
145 fn get_key_indices(&self) -> Option<&[usize]> {
146 self.key_desc_and_indices
147 .as_ref()
148 .map(|(_desc, indices)| indices.as_slice())
149 }
150
151 fn get_relation_key_indices(&self) -> Option<&[usize]> {
152 self.relation_key_indices.as_deref()
153 }
154
155 fn render_sink(
156 &self,
157 storage_state: &mut StorageState,
158 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
159 sink_id: GlobalId,
160 input: Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
161 _err_collection: Collection<G, DataflowError, Diff>,
164 ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
165 let mut scope = input.scope();
166
167 let write_handle = {
168 let persist = Arc::clone(&storage_state.persist_clients);
169 let shard_meta = sink.to_storage_metadata.clone();
170 async move {
171 let client = persist.open(shard_meta.persist_location).await?;
172 let handle = client
173 .open_writer(
174 shard_meta.data_shard,
175 Arc::new(shard_meta.relation_desc),
176 Arc::new(UnitSchema),
177 Diagnostics::from_purpose("sink handle"),
178 )
179 .await?;
180 Ok(handle)
181 }
182 };
183
184 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
185 storage_state
186 .sink_write_frontiers
187 .insert(sink_id, Rc::clone(&write_frontier));
188
189 let (encoded, encode_status, encode_token) = encode_collection(
190 format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
191 &input,
192 sink.envelope,
193 self.clone(),
194 storage_state.storage_configuration.clone(),
195 );
196
197 let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id);
198 let statistics = storage_state
199 .aggregated_statistics
200 .get_sink(&sink_id)
201 .expect("statistics initialized")
202 .clone();
203
204 let (sink_status, sink_token) = sink_collection(
205 format!("kafka-{sink_id}-sink"),
206 &encoded,
207 sink_id,
208 self.clone(),
209 storage_state.storage_configuration.clone(),
210 sink,
211 metrics,
212 statistics,
213 write_handle,
214 write_frontier,
215 );
216
217 let running_status = Some(HealthStatusMessage {
218 id: None,
219 update: HealthStatusUpdate::Running,
220 namespace: StatusNamespace::Kafka,
221 })
222 .to_stream(&mut scope);
223
224 let status = scope.concatenate([running_status, encode_status, sink_status]);
225
226 (status, vec![encode_token, sink_token])
227 }
228}
229
230struct TransactionalProducer {
231 task_name: String,
233 data_topic: String,
235 progress_topic: String,
237 progress_key: ProgressKey,
239 sink_version: u64,
241 partition_count: Arc<AtomicU64>,
243 _partition_count_task: AbortOnDropHandle<()>,
245 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
247 statistics: SinkStatistics,
249 staged_messages: u64,
252 staged_bytes: u64,
255 socket_timeout: Duration,
257 transaction_timeout: Duration,
259}
260
261impl TransactionalProducer {
262 async fn new(
266 sink_id: GlobalId,
267 connection: &KafkaSinkConnection,
268 storage_configuration: &StorageConfiguration,
269 metrics: Arc<KafkaSinkMetrics>,
270 statistics: SinkStatistics,
271 sink_version: u64,
272 ) -> Result<(Self, Antichain<mz_repr::Timestamp>), ContextCreationError> {
273 let client_id = connection.client_id(
274 storage_configuration.config_set(),
275 &storage_configuration.connection_context,
276 sink_id,
277 );
278 let transactional_id =
279 connection.transactional_id(&storage_configuration.connection_context, sink_id);
280
281 let timeout_config = &storage_configuration.parameters.kafka_timeout_config;
282 let mut options = BTreeMap::new();
283 options.insert("enable.idempotence", "true".into());
287 options.insert(
289 "compression.type",
290 connection.compression_type.to_librdkafka_option().into(),
291 );
292 options.insert("queue.buffering.max.kbytes", "2147483647".into());
295 options.insert("queue.buffering.max.messages", "0".into());
298 options.insert("queue.buffering.max.ms", format!("{}", 10));
300 options.insert(
302 "transaction.timeout.ms",
303 format!("{}", timeout_config.transaction_timeout.as_millis()),
304 );
305 options.insert("transactional.id", transactional_id);
307 options.insert("client.id", client_id);
309 options.insert("statistics.interval.ms", "1000".into());
311
312 let ctx = MzClientContext::default();
313
314 let stats_receiver = ctx.subscribe_statistics();
315 let task_name = format!("kafka_sink_metrics_collector:{sink_id}");
316 task::spawn(
317 || &task_name,
318 collect_statistics(stats_receiver, Arc::clone(&metrics)),
319 );
320
321 let producer: ThreadedProducer<_> = connection
322 .connection
323 .create_with_context(storage_configuration, ctx, &options, InTask::Yes)
324 .await?;
325
326 let partition_count = Arc::new(AtomicU64::new(0));
328 let update_partition_count = {
329 let partition_count = Arc::clone(&partition_count);
330 let metrics = Arc::clone(&metrics);
331 Arc::new(move |pc| {
332 partition_count.store(pc, std::sync::atomic::Ordering::SeqCst);
333 metrics.partition_count.set(pc);
334 })
335 };
336
337 let partition_count_task = task::spawn(
340 || format!("kafka_sink_producer_fetch_metadata_loop:{sink_id}"),
341 fetch_partition_count_loop(
342 producer.clone(),
343 sink_id,
344 connection.topic.clone(),
345 connection.topic_metadata_refresh_interval,
346 Arc::clone(&update_partition_count),
347 ),
348 );
349
350 let task_name = format!("kafka_sink_producer:{sink_id}");
351 let progress_key = ProgressKey::new(sink_id);
352
353 let producer = Self {
354 task_name,
355 data_topic: connection.topic.clone(),
356 partition_count,
357 _partition_count_task: partition_count_task.abort_on_drop(),
358 progress_topic: connection
359 .progress_topic(&storage_configuration.connection_context)
360 .into_owned(),
361 progress_key,
362 sink_version,
363 producer,
364 statistics,
365 staged_messages: 0,
366 staged_bytes: 0,
367 socket_timeout: timeout_config.socket_timeout,
368 transaction_timeout: timeout_config.transaction_timeout,
369 };
370
371 let timeout = timeout_config.socket_timeout;
372 producer
373 .spawn_blocking(move |p| p.init_transactions(timeout))
374 .await?;
375
376 let progress = determine_sink_progress(
379 sink_id,
380 connection,
381 storage_configuration,
382 Arc::clone(&metrics),
383 )
384 .await?;
385
386 let resume_upper = match progress {
387 Some(progress) => {
388 if sink_version < progress.version {
389 return Err(ContextCreationError::Other(anyhow!(
390 "Fenced off by newer version of the sink. ours={} theirs={}",
391 sink_version,
392 progress.version
393 )));
394 }
395 progress.frontier
396 }
397 None => {
398 mz_storage_client::sink::ensure_kafka_topic(
399 connection,
400 storage_configuration,
401 &connection.topic,
402 &connection.topic_options,
403 EnsureTopicConfig::Skip,
404 )
405 .await?;
406 Antichain::from_elem(Timestamp::minimum())
407 }
408 };
409
410 let partition_count =
415 fetch_partition_count(&producer.producer, sink_id, &connection.topic).await?;
416 update_partition_count(partition_count);
417
418 Ok((producer, resume_upper))
419 }
420
421 async fn spawn_blocking<F, R>(&self, f: F) -> Result<R, ContextCreationError>
424 where
425 F: FnOnce(
426 ThreadedProducer<TunnelingClientContext<MzClientContext>>,
427 ) -> Result<R, KafkaError>
428 + Send
429 + 'static,
430 R: Send + 'static,
431 {
432 let producer = self.producer.clone();
433 task::spawn_blocking(|| &self.task_name, move || f(producer))
434 .await
435 .unwrap()
436 .check_ssh_status(self.producer.context())
437 }
438
439 async fn begin_transaction(&self) -> Result<(), ContextCreationError> {
440 self.spawn_blocking(|p| p.begin_transaction()).await
441 }
442
443 fn send(
449 &mut self,
450 message: &KafkaMessage,
451 time: Timestamp,
452 diff: Diff,
453 ) -> Result<(), KafkaError> {
454 assert_eq!(diff, Diff::ONE, "invalid sink update");
455
456 let mut headers = OwnedHeaders::new().insert(Header {
457 key: "materialize-timestamp",
458 value: Some(time.to_string().as_bytes()),
459 });
460 for header in &message.headers {
461 if header.key.starts_with("materialize-") {
467 continue;
468 }
469
470 headers = headers.insert(Header {
471 key: header.key.as_str(),
472 value: header.value.as_ref(),
473 });
474 }
475
476 let pc = self
477 .partition_count
478 .load(std::sync::atomic::Ordering::SeqCst);
479 let partition = Some(i32::try_from(message.hash % pc).unwrap());
480
481 let record = BaseRecord {
482 topic: &self.data_topic,
483 key: message.key.as_ref(),
484 payload: message.value.as_ref(),
485 headers: Some(headers),
486 partition,
487 timestamp: None,
488 delivery_opaque: (),
489 };
490 let key_size = message.key.as_ref().map(|k| k.len()).unwrap_or(0);
491 let value_size = message.value.as_ref().map(|k| k.len()).unwrap_or(0);
492 let headers_size = message
493 .headers
494 .iter()
495 .map(|h| h.key.len() + h.value.as_ref().map(|v| v.len()).unwrap_or(0))
496 .sum::<usize>();
497 let record_size = u64::cast_from(key_size + value_size + headers_size);
498 self.statistics.inc_messages_staged_by(1);
499 self.staged_messages += 1;
500 self.statistics.inc_bytes_staged_by(record_size);
501 self.staged_bytes += record_size;
502 self.producer.send(record).map_err(|(e, _)| e)
503 }
504
505 async fn commit_transaction(
508 &mut self,
509 upper: Antichain<Timestamp>,
510 ) -> Result<(), ContextCreationError> {
511 let progress = ProgressRecord {
512 frontier: upper,
513 version: self.sink_version,
514 };
515 let payload = serde_json::to_vec(&progress).expect("infallible");
516 let record = BaseRecord::to(&self.progress_topic)
517 .payload(&payload)
518 .key(&self.progress_key);
519 self.producer.send(record).map_err(|(e, _)| e)?;
520
521 fail::fail_point!("kafka_sink_commit_transaction");
522
523 let timeout = self.transaction_timeout;
524 match self
525 .spawn_blocking(move |p| p.commit_transaction(timeout))
526 .await
527 {
528 Ok(()) => {
529 self.statistics
530 .inc_messages_committed_by(self.staged_messages);
531 self.statistics.inc_bytes_committed_by(self.staged_bytes);
532 self.staged_messages = 0;
533 self.staged_bytes = 0;
534 Ok(())
535 }
536 Err(ContextCreationError::KafkaError(KafkaError::Transaction(err))) => {
537 if err.txn_requires_abort() {
543 let timeout = self.socket_timeout;
544 self.spawn_blocking(move |p| p.abort_transaction(timeout))
545 .await?;
546 }
547 Err(ContextCreationError::KafkaError(KafkaError::Transaction(
548 err,
549 )))
550 }
551 Err(err) => Err(err),
552 }
553 }
554}
555
556async fn collect_statistics(
558 mut receiver: watch::Receiver<Statistics>,
559 metrics: Arc<KafkaSinkMetrics>,
560) {
561 let mut outbuf_cnt: i64 = 0;
562 let mut outbuf_msg_cnt: i64 = 0;
563 let mut waitresp_cnt: i64 = 0;
564 let mut waitresp_msg_cnt: i64 = 0;
565 let mut txerrs: u64 = 0;
566 let mut txretries: u64 = 0;
567 let mut req_timeouts: u64 = 0;
568 let mut connects: i64 = 0;
569 let mut disconnects: i64 = 0;
570 while receiver.changed().await.is_ok() {
571 let stats = receiver.borrow();
572 for broker in stats.brokers.values() {
573 outbuf_cnt += broker.outbuf_cnt;
574 outbuf_msg_cnt += broker.outbuf_msg_cnt;
575 waitresp_cnt += broker.waitresp_cnt;
576 waitresp_msg_cnt += broker.waitresp_msg_cnt;
577 txerrs += broker.txerrs;
578 txretries += broker.txretries;
579 req_timeouts += broker.req_timeouts;
580 connects += broker.connects.unwrap_or(0);
581 disconnects += broker.disconnects.unwrap_or(0);
582 }
583 metrics.rdkafka_msg_cnt.set(stats.msg_cnt);
584 metrics.rdkafka_msg_size.set(stats.msg_size);
585 metrics.rdkafka_txmsgs.set(stats.txmsgs);
586 metrics.rdkafka_txmsg_bytes.set(stats.txmsg_bytes);
587 metrics.rdkafka_tx.set(stats.tx);
588 metrics.rdkafka_tx_bytes.set(stats.tx_bytes);
589 metrics.rdkafka_outbuf_cnt.set(outbuf_cnt);
590 metrics.rdkafka_outbuf_msg_cnt.set(outbuf_msg_cnt);
591 metrics.rdkafka_waitresp_cnt.set(waitresp_cnt);
592 metrics.rdkafka_waitresp_msg_cnt.set(waitresp_msg_cnt);
593 metrics.rdkafka_txerrs.set(txerrs);
594 metrics.rdkafka_txretries.set(txretries);
595 metrics.rdkafka_req_timeouts.set(req_timeouts);
596 metrics.rdkafka_connects.set(connects);
597 metrics.rdkafka_disconnects.set(disconnects);
598 }
599}
600
601#[derive(Debug, Clone, Serialize, Deserialize)]
603struct KafkaMessage {
604 hash: u64,
606 key: Option<Vec<u8>>,
608 value: Option<Vec<u8>>,
610 headers: Vec<KafkaHeader>,
612}
613
614#[derive(Debug, Clone, Serialize, Deserialize)]
616struct KafkaHeader {
617 key: String,
619 value: Option<Vec<u8>>,
621}
622
623fn sink_collection<G: Scope<Timestamp = Timestamp>>(
629 name: String,
630 input: &Collection<G, KafkaMessage, Diff>,
631 sink_id: GlobalId,
632 connection: KafkaSinkConnection,
633 storage_configuration: StorageConfiguration,
634 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
635 metrics: KafkaSinkMetrics,
636 statistics: SinkStatistics,
637 write_handle: impl Future<
638 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
639 > + 'static,
640 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
641) -> (Stream<G, HealthStatusMessage>, PressOnDropButton) {
642 let scope = input.scope();
643 let mut builder = AsyncOperatorBuilder::new(name.clone(), input.inner.scope());
644
645 let hashed_id = sink_id.hashed();
647 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
648 let buffer_min_capacity =
649 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS.handle(storage_configuration.config_set());
650
651 let mut input = builder.new_disconnected_input(&input.inner, Exchange::new(move |_| hashed_id));
652
653 let as_of = sink.as_of.clone();
654 let sink_version = sink.version;
655 let (button, errors) = builder.build_fallible(move |_caps| {
656 Box::pin(async move {
657 if !is_active_worker {
658 write_frontier.borrow_mut().clear();
659 return Ok(());
660 }
661
662 fail::fail_point!("kafka_sink_creation_error", |_| Err(
663 ContextCreationError::Other(anyhow::anyhow!("synthetic error"))
664 ));
665
666 let mut write_handle = write_handle.await?;
667
668 let metrics = Arc::new(metrics);
669
670 let (mut producer, resume_upper) = TransactionalProducer::new(
671 sink_id,
672 &connection,
673 &storage_configuration,
674 Arc::clone(&metrics),
675 statistics,
676 sink_version,
677 )
678 .await?;
679
680 let overcompacted =
682 *resume_upper != [Timestamp::minimum()] &&
684 !PartialOrder::less_equal(&as_of, &resume_upper);
686 if overcompacted {
687 let err = format!(
688 "{name}: input compacted past resume upper: as_of {}, resume_upper: {}",
689 as_of.pretty(),
690 resume_upper.pretty()
691 );
692 error!("{err}");
696 return Err(anyhow!("{err}").into());
697 }
698
699 info!(
700 "{name}: as_of: {}, resume upper: {}",
701 as_of.pretty(),
702 resume_upper.pretty()
703 );
704
705 let Some(mut upper) = resume_upper.clone().into_option() else {
709 write_frontier.borrow_mut().clear();
710 return Ok(());
711 };
712
713 let mut deferred_updates = vec![];
714 let mut extra_updates = vec![];
715 let mut transaction_begun = false;
719 while let Some(event) = input.next().await {
720 match event {
721 Event::Data(_cap, batch) => {
722 for (message, time, diff) in batch {
723 match upper.cmp(&time) {
734 Ordering::Less => deferred_updates.push((message, time, diff)),
735 Ordering::Equal => {
736 if !transaction_begun {
737 producer.begin_transaction().await?;
738 transaction_begun = true;
739 }
740 producer.send(&message, time, diff)?;
741 }
742 Ordering::Greater => continue,
743 }
744 }
745 }
746 Event::Progress(progress) => {
747 if !PartialOrder::less_equal(&resume_upper, &progress) {
749 continue;
750 }
751 if !as_of.iter().all(|t| !progress.less_equal(t)) {
771 continue;
772 }
773 if !transaction_begun {
774 producer.begin_transaction().await?;
775 }
776
777 deferred_updates.shrink_to(buffer_min_capacity.get());
782 extra_updates.extend(
783 deferred_updates
784 .drain_filter_swapping(|(_, time, _)| !progress.less_equal(time)),
785 );
786 extra_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
787
788 extra_updates.shrink_to(buffer_min_capacity.get());
790 for (message, time, diff) in extra_updates.drain(..) {
791 producer.send(&message, time, diff)?;
792 }
793
794 debug!("{name}: committing transaction for {}", progress.pretty());
795 producer.commit_transaction(progress.clone()).await?;
796 transaction_begun = false;
797 let mut expect_upper = write_handle.shared_upper();
798 loop {
799 if PartialOrder::less_equal(&progress, &expect_upper) {
800 break;
802 }
803 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
808 match write_handle
809 .compare_and_append(EMPTY, expect_upper, progress.clone())
810 .await
811 .expect("valid usage")
812 {
813 Ok(()) => break,
814 Err(mismatch) => {
815 expect_upper = mismatch.current;
816 }
817 }
818 }
819 write_frontier.borrow_mut().clone_from(&progress);
820 match progress.into_option() {
821 Some(new_upper) => upper = new_upper,
822 None => break,
823 }
824 }
825 }
826 }
827 Ok(())
828 })
829 });
830
831 let statuses = errors.map(|error: Rc<ContextCreationError>| {
832 let hint = match *error {
833 ContextCreationError::KafkaError(KafkaError::Transaction(ref e)) => {
834 if e.is_retriable() && e.code() == RDKafkaErrorCode::OperationTimedOut {
835 let hint = "If you're running a single Kafka broker, ensure that the configs \
836 transaction.state.log.replication.factor, transaction.state.log.min.isr, \
837 and offsets.topic.replication.factor are set to 1 on the broker";
838 Some(hint.to_owned())
839 } else {
840 None
841 }
842 }
843 _ => None,
844 };
845
846 HealthStatusMessage {
847 id: None,
848 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint),
849 namespace: if matches!(*error, ContextCreationError::Ssh(_)) {
850 StatusNamespace::Ssh
851 } else {
852 StatusNamespace::Kafka
853 },
854 }
855 });
856
857 (statuses, button.press_on_drop())
858}
859
860async fn determine_sink_progress(
867 sink_id: GlobalId,
868 connection: &KafkaSinkConnection,
869 storage_configuration: &StorageConfiguration,
870 metrics: Arc<KafkaSinkMetrics>,
871) -> Result<Option<ProgressRecord>, ContextCreationError> {
872 let TimeoutConfig {
879 fetch_metadata_timeout,
880 progress_record_fetch_timeout,
881 ..
882 } = storage_configuration.parameters.kafka_timeout_config;
883
884 let client_id = connection.client_id(
885 storage_configuration.config_set(),
886 &storage_configuration.connection_context,
887 sink_id,
888 );
889 let group_id = connection.progress_group_id(&storage_configuration.connection_context, sink_id);
890 let progress_topic = connection
891 .progress_topic(&storage_configuration.connection_context)
892 .into_owned();
893 let progress_topic_options = &connection.connection.progress_topic_options;
894 let progress_key = ProgressKey::new(sink_id);
895
896 let common_options = btreemap! {
897 "group.id" => group_id,
900 "client.id" => client_id,
902 "enable.auto.commit" => "false".into(),
903 "auto.offset.reset" => "earliest".into(),
904 "enable.partition.eof" => "true".into(),
907 };
908
909 let progress_client_read_committed: BaseConsumer<_> = {
912 let mut opts = common_options.clone();
913 opts.insert("isolation.level", "read_committed".into());
914 let ctx = MzClientContext::default();
915 connection
916 .connection
917 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
918 .await?
919 };
920
921 let progress_client_read_uncommitted: BaseConsumer<_> = {
922 let mut opts = common_options;
923 opts.insert("isolation.level", "read_uncommitted".into());
924 let ctx = MzClientContext::default();
925 connection
926 .connection
927 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
928 .await?
929 };
930
931 let ctx = Arc::clone(progress_client_read_committed.client().context());
932
933 let ensure_topic_config =
935 match &*SINK_ENSURE_TOPIC_CONFIG.get(storage_configuration.config_set()) {
936 "skip" => EnsureTopicConfig::Skip,
937 "check" => EnsureTopicConfig::Check,
938 "alter" => EnsureTopicConfig::Alter,
939 _ => {
940 tracing::warn!(
941 topic = progress_topic,
942 "unexpected value for ensure-topic-config; skipping checks"
943 );
944 EnsureTopicConfig::Skip
945 }
946 };
947 mz_storage_client::sink::ensure_kafka_topic(
948 connection,
949 storage_configuration,
950 &progress_topic,
951 progress_topic_options,
952 ensure_topic_config,
953 )
954 .await
955 .add_context("error registering kafka progress topic for sink")?;
956
957 let parent_token = Arc::new(());
964 let child_token = Arc::downgrade(&parent_token);
965 let task_name = format!("get_latest_ts:{sink_id}");
966 let sink_progress_search = SINK_PROGRESS_SEARCH.get(storage_configuration.config_set());
967 let result = task::spawn_blocking(|| task_name, move || {
968 let progress_topic = progress_topic.as_ref();
969 let partitions = match mz_kafka_util::client::get_partitions(
973 progress_client_read_committed.client(),
974 progress_topic,
975 fetch_metadata_timeout,
976 ) {
977 Ok(partitions) => partitions,
978 Err(GetPartitionsError::TopicDoesNotExist) => {
979 return Ok(None);
982 }
983 e => e.with_context(|| {
984 format!(
985 "Unable to fetch metadata about progress topic {}",
986 progress_topic
987 )
988 })?,
989 };
990 if partitions.len() != 1 {
991 bail!(
992 "Progress topic {} should contain a single partition, but instead contains {} partitions",
993 progress_topic, partitions.len(),
994 );
995 }
996 let partition = partitions.into_element();
997
998 metrics.consumed_progress_records.set(0);
1007
1008 let (lo, hi) = progress_client_read_uncommitted
1034 .fetch_watermarks(progress_topic, partition, fetch_metadata_timeout)
1035 .map_err(|e| {
1036 anyhow!(
1037 "Failed to fetch metadata while reading from progress topic: {}",
1038 e
1039 )
1040 })?;
1041
1042 let mut start_indices = vec![lo];
1048 if sink_progress_search {
1049 let mut lookback = hi.saturating_sub(lo) / 10;
1050 while lookback >= 20_000 {
1051 start_indices.push(hi - lookback);
1052 lookback /= 10;
1053 }
1054 }
1055 for lo in start_indices.into_iter().rev() {
1056 if let Some(found) = progress_search(
1057 &progress_client_read_committed,
1058 progress_record_fetch_timeout,
1059 progress_topic,
1060 partition,
1061 lo,
1062 hi,
1063 progress_key.clone(),
1064 Weak::clone(&child_token),
1065 Arc::clone(&metrics)
1066 )? {
1067 return Ok(Some(found));
1068 }
1069 }
1070 Ok(None)
1071 }).await.unwrap().check_ssh_status(&ctx);
1072 drop(parent_token);
1074 result
1075}
1076
1077fn progress_search<C: ConsumerContext + 'static>(
1078 progress_client_read_committed: &BaseConsumer<C>,
1079 progress_record_fetch_timeout: Duration,
1080 progress_topic: &str,
1081 partition: i32,
1082 lo: i64,
1083 hi: i64,
1084 progress_key: ProgressKey,
1085 child_token: Weak<()>,
1086 metrics: Arc<KafkaSinkMetrics>,
1087) -> anyhow::Result<Option<ProgressRecord>> {
1088 let mut tps = TopicPartitionList::new();
1090 tps.add_partition(progress_topic, partition);
1091 tps.set_partition_offset(progress_topic, partition, Offset::Offset(lo))?;
1092 progress_client_read_committed
1093 .assign(&tps)
1094 .with_context(|| {
1095 format!(
1096 "Error seeking in progress topic {}:{}",
1097 progress_topic, partition
1098 )
1099 })?;
1100
1101 let get_position = || {
1103 if child_token.strong_count() == 0 {
1104 bail!("operation cancelled");
1105 }
1106 let position = progress_client_read_committed
1107 .position()?
1108 .find_partition(progress_topic, partition)
1109 .ok_or_else(|| {
1110 anyhow!(
1111 "No position info found for progress topic {}",
1112 progress_topic
1113 )
1114 })?
1115 .offset();
1116 let position = match position {
1117 Offset::Offset(position) => position,
1118 Offset::Invalid => lo,
1128 _ => bail!(
1129 "Consumer::position returned offset of wrong type: {:?}",
1130 position
1131 ),
1132 };
1133 let outstanding = u64::try_from(std::cmp::max(0, hi - position)).unwrap();
1135 metrics.outstanding_progress_records.set(outstanding);
1136 Ok(position)
1137 };
1138
1139 info!("fetching latest progress record for {progress_key}, lo/hi: {lo}/{hi}");
1140
1141 let mut last_progress: Option<ProgressRecord> = None;
1165 loop {
1166 let current_position = get_position()?;
1167
1168 if current_position >= hi {
1169 break;
1171 }
1172
1173 let message = match progress_client_read_committed.poll(progress_record_fetch_timeout) {
1174 Some(Ok(message)) => message,
1175 Some(Err(KafkaError::PartitionEOF(_))) => {
1176 continue;
1181 }
1182 Some(Err(e)) => bail!("failed to fetch progress message {e}"),
1183 None => {
1184 bail!(
1185 "timed out while waiting to reach high water mark of non-empty \
1186 topic {progress_topic}:{partition}, lo/hi: {lo}/{hi}, current position: {current_position}"
1187 );
1188 }
1189 };
1190
1191 if message.key() != Some(progress_key.to_bytes()) {
1192 continue;
1194 }
1195
1196 metrics.consumed_progress_records.inc();
1197
1198 let Some(payload) = message.payload() else {
1199 continue;
1200 };
1201 let progress = parse_progress_record(payload)?;
1202
1203 match last_progress {
1204 Some(last_progress)
1205 if !PartialOrder::less_equal(&last_progress.frontier, &progress.frontier) =>
1206 {
1207 bail!(
1208 "upper regressed in topic {progress_topic}:{partition} from {:?} to {:?}",
1209 &last_progress.frontier,
1210 &progress.frontier,
1211 );
1212 }
1213 _ => last_progress = Some(progress),
1214 }
1215 }
1216
1217 Ok(last_progress)
1221}
1222
1223#[derive(Debug, PartialEq, Serialize, Deserialize)]
1230pub struct LegacyProgressRecord {
1231 #[serde(default, deserialize_with = "deserialize_some")]
1234 pub timestamp: Option<Option<Timestamp>>,
1235}
1236
1237fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
1239where
1240 T: Deserialize<'de>,
1241 D: Deserializer<'de>,
1242{
1243 Deserialize::deserialize(deserializer).map(Some)
1244}
1245
1246#[derive(Debug, PartialEq, Serialize, Deserialize)]
1249pub struct ProgressRecord {
1250 #[serde(
1251 deserialize_with = "deserialize_frontier",
1252 serialize_with = "serialize_frontier"
1253 )]
1254 pub frontier: Antichain<Timestamp>,
1255 #[serde(default)]
1256 pub version: u64,
1257}
1258fn serialize_frontier<S>(frontier: &Antichain<Timestamp>, serializer: S) -> Result<S::Ok, S::Error>
1259where
1260 S: Serializer,
1261{
1262 Serialize::serialize(frontier.elements(), serializer)
1263}
1264
1265fn deserialize_frontier<'de, D>(deserializer: D) -> Result<Antichain<Timestamp>, D::Error>
1266where
1267 D: Deserializer<'de>,
1268{
1269 let times: Vec<Timestamp> = Deserialize::deserialize(deserializer)?;
1270 Ok(Antichain::from(times))
1271}
1272
1273fn parse_progress_record(payload: &[u8]) -> Result<ProgressRecord, anyhow::Error> {
1274 Ok(match serde_json::from_slice::<ProgressRecord>(payload) {
1275 Ok(progress) => progress,
1276 Err(_) => match serde_json::from_slice::<LegacyProgressRecord>(payload) {
1278 Ok(LegacyProgressRecord {
1279 timestamp: Some(Some(time)),
1280 }) => ProgressRecord {
1281 frontier: Antichain::from_elem(time.step_forward()),
1282 version: 0,
1283 },
1284 Ok(LegacyProgressRecord {
1285 timestamp: Some(None),
1286 }) => ProgressRecord {
1287 frontier: Antichain::new(),
1288 version: 0,
1289 },
1290 _ => match std::str::from_utf8(payload) {
1291 Ok(payload) => bail!("invalid progress record: {payload}"),
1292 Err(_) => bail!("invalid progress record bytes: {payload:?}"),
1293 },
1294 },
1295 })
1296}
1297
1298async fn fetch_partition_count(
1300 producer: &ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1301 sink_id: GlobalId,
1302 topic_name: &str,
1303) -> Result<u64, anyhow::Error> {
1304 let meta = task::spawn_blocking(|| format!("kafka_sink_fetch_partition_count:{sink_id}"), {
1305 let producer = producer.clone();
1306 move || {
1307 producer
1308 .client()
1309 .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
1310 }
1311 })
1312 .await
1313 .expect("spawning blocking task cannot fail")
1314 .check_ssh_status(producer.context())?;
1315
1316 match meta.topics().iter().find(|t| t.name() == topic_name) {
1317 Some(topic) => {
1318 let partition_count = u64::cast_from(topic.partitions().len());
1319 if partition_count == 0 {
1320 bail!("topic {topic_name} has an impossible partition count of zero");
1321 }
1322 Ok(partition_count)
1323 }
1324 None => bail!("topic {topic_name} does not exist"),
1325 }
1326}
1327
1328async fn fetch_partition_count_loop<F>(
1334 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1335 sink_id: GlobalId,
1336 topic_name: String,
1337 interval: Duration,
1338 update_partition_count: Arc<F>,
1339) where
1340 F: Fn(u64),
1341{
1342 let mut interval = time::interval(interval);
1343 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
1344 loop {
1345 interval.tick().await;
1346 match fetch_partition_count(&producer, sink_id, &topic_name).await {
1347 Ok(pc) => update_partition_count(pc),
1348 Err(e) => {
1349 warn!(%sink_id, "failed updating partition count: {e}");
1350 continue;
1351 }
1352 };
1353 }
1354}
1355
1356fn encode_collection<G: Scope>(
1360 name: String,
1361 input: &Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
1362 envelope: SinkEnvelope,
1363 connection: KafkaSinkConnection,
1364 storage_configuration: StorageConfiguration,
1365) -> (
1366 Collection<G, KafkaMessage, Diff>,
1367 Stream<G, HealthStatusMessage>,
1368 PressOnDropButton,
1369) {
1370 let mut builder = AsyncOperatorBuilder::new(name, input.inner.scope());
1371
1372 let (output, stream) = builder.new_output();
1373 let mut input = builder.new_input_for(&input.inner, Pipeline, &output);
1374
1375 let (button, errors) = builder.build_fallible(move |caps| {
1376 Box::pin(async move {
1377 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1378 let key_desc = connection
1379 .key_desc_and_indices
1380 .as_ref()
1381 .map(|(desc, _indices)| desc.clone());
1382 let value_desc = connection.value_desc;
1383
1384 let key_encoder: Option<Box<dyn Encode>> =
1385 match (key_desc, connection.format.key_format) {
1386 (Some(desc), Some(KafkaSinkFormatType::Bytes)) => {
1387 Some(Box::new(BinaryEncoder::new(desc, false)))
1388 }
1389 (Some(desc), Some(KafkaSinkFormatType::Text)) => {
1390 Some(Box::new(TextEncoder::new(desc, false)))
1391 }
1392 (Some(desc), Some(KafkaSinkFormatType::Json)) => {
1393 Some(Box::new(JsonEncoder::new(desc, false)))
1394 }
1395 (Some(desc), Some(KafkaSinkFormatType::Avro {
1396 schema,
1397 compatibility_level,
1398 csr_connection,
1399 })) => {
1400 let ccsr = csr_connection
1405 .connect(&storage_configuration, InTask::Yes)
1406 .await?;
1407
1408 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1409 ccsr,
1410 format!("{}-key", connection.topic),
1411 schema.clone(),
1412 mz_ccsr::SchemaType::Avro,
1413 compatibility_level,
1414 )
1415 .await
1416 .context("error publishing kafka schemas for sink")?;
1417
1418 Some(Box::new(AvroEncoder::new(desc, false, &schema, schema_id)))
1419 }
1420 (None, None) => None,
1421 (desc, format) => {
1422 return Err(anyhow!(
1423 "key_desc and key_format must be both set or both unset, but key_desc: {:?}, key_format: {:?}",
1424 desc,
1425 format
1426 ))
1427 }
1428 };
1429
1430 let debezium = matches!(envelope, SinkEnvelope::Debezium);
1432
1433 let value_encoder: Box<dyn Encode> = match connection.format.value_format {
1434 KafkaSinkFormatType::Bytes => Box::new(BinaryEncoder::new(value_desc, debezium)),
1435 KafkaSinkFormatType::Text => Box::new(TextEncoder::new(value_desc, debezium)),
1436 KafkaSinkFormatType::Json => Box::new(JsonEncoder::new(value_desc, debezium)),
1437 KafkaSinkFormatType::Avro {
1438 schema,
1439 compatibility_level,
1440 csr_connection,
1441 } => {
1442 let ccsr = csr_connection
1447 .connect(&storage_configuration, InTask::Yes)
1448 .await?;
1449
1450 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1451 ccsr,
1452 format!("{}-value", connection.topic),
1453 schema.clone(),
1454 mz_ccsr::SchemaType::Avro,
1455 compatibility_level,
1456 )
1457 .await
1458 .context("error publishing kafka schemas for sink")?;
1459
1460 Box::new(AvroEncoder::new(value_desc, debezium, &schema, schema_id))
1461 }
1462 };
1463
1464 *capset = CapabilitySet::new();
1470
1471 let mut row_buf = Row::default();
1472 let mut datums = DatumVec::new();
1473
1474 while let Some(event) = input.next().await {
1475 if let Event::Data(cap, rows) = event {
1476 for ((key, value), time, diff) in rows {
1477 let mut hash = None;
1478 let mut headers = vec![];
1479 if connection.headers_index.is_some() || connection.partition_by.is_some() {
1480 let row = value
1493 .after
1494 .as_ref()
1495 .or(value.before.as_ref())
1496 .expect("one of before or after must be set");
1497 let row = datums.borrow_with(row);
1498
1499 if let Some(i) = connection.headers_index {
1500 headers = encode_headers(row[i]);
1501 }
1502
1503 if let Some(partition_by) = &connection.partition_by {
1504 hash = Some(evaluate_partition_by(partition_by, &row));
1505 }
1506 }
1507 let (key, hash) = match key {
1508 Some(key) => {
1509 let key_encoder = key_encoder.as_ref().expect("key present");
1510 let key = key_encoder.encode_unchecked(key);
1511 let hash = hash.unwrap_or_else(|| key_encoder.hash(&key));
1512 (Some(key), hash)
1513 }
1514 None => (None, hash.unwrap_or(0))
1515 };
1516 let value = match envelope {
1517 SinkEnvelope::Upsert => value.after,
1518 SinkEnvelope::Debezium => {
1519 dbz_format(&mut row_buf.packer(), value);
1520 Some(row_buf.clone())
1521 }
1522 };
1523 let value = value.map(|value| value_encoder.encode_unchecked(value));
1524 let message = KafkaMessage {
1525 hash,
1526 key,
1527 value,
1528 headers,
1529 };
1530 output.give(&cap, (message, time, diff));
1531 }
1532 }
1533 }
1534 Ok::<(), anyhow::Error>(())
1535 })
1536 });
1537
1538 let statuses = errors.map(|error| HealthStatusMessage {
1539 id: None,
1540 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1541 namespace: StatusNamespace::Kafka,
1542 });
1543
1544 (stream.as_collection(), statuses, button.press_on_drop())
1545}
1546
1547fn encode_headers(datum: Datum) -> Vec<KafkaHeader> {
1548 let mut out = vec![];
1549 if datum.is_null() {
1550 return out;
1551 }
1552 for (key, value) in datum.unwrap_map().iter() {
1553 out.push(KafkaHeader {
1554 key: key.into(),
1555 value: match value {
1556 Datum::Null => None,
1557 Datum::String(s) => Some(s.as_bytes().to_vec()),
1558 Datum::Bytes(b) => Some(b.to_vec()),
1559 _ => panic!("encode_headers called with unexpected header value {value:?}"),
1560 },
1561 })
1562 }
1563 out
1564}
1565
1566fn evaluate_partition_by(partition_by: &MirScalarExpr, row: &[Datum]) -> u64 {
1574 let temp_storage = RowArena::new();
1582 match partition_by.eval(row, &temp_storage) {
1583 Ok(hash) => match hash {
1584 Datum::Int32(i) => i.try_into().unwrap_or(0),
1585 Datum::Int64(i) => i.try_into().unwrap_or(0),
1586 Datum::UInt32(u) => u64::from(u),
1587 Datum::UInt64(u) => u,
1588 _ => unreachable!(),
1589 },
1590 Err(_) => 0,
1591 }
1592}
1593
1594#[cfg(test)]
1595mod test {
1596 use mz_ore::assert_err;
1597
1598 use super::*;
1599
1600 #[mz_ore::test]
1601 fn progress_record_migration() {
1602 assert_err!(parse_progress_record(b"{}"));
1603
1604 assert_eq!(
1605 parse_progress_record(b"{\"timestamp\":1}").unwrap(),
1606 ProgressRecord {
1607 frontier: Antichain::from_elem(2.into()),
1608 version: 0,
1609 }
1610 );
1611
1612 assert_eq!(
1613 parse_progress_record(b"{\"timestamp\":null}").unwrap(),
1614 ProgressRecord {
1615 frontier: Antichain::new(),
1616 version: 0,
1617 }
1618 );
1619
1620 assert_eq!(
1621 parse_progress_record(b"{\"frontier\":[1]}").unwrap(),
1622 ProgressRecord {
1623 frontier: Antichain::from_elem(1.into()),
1624 version: 0,
1625 }
1626 );
1627
1628 assert_eq!(
1629 parse_progress_record(b"{\"frontier\":[]}").unwrap(),
1630 ProgressRecord {
1631 frontier: Antichain::new(),
1632 version: 0,
1633 }
1634 );
1635
1636 assert_eq!(
1637 parse_progress_record(b"{\"frontier\":[], \"version\": 42}").unwrap(),
1638 ProgressRecord {
1639 frontier: Antichain::new(),
1640 version: 42,
1641 }
1642 );
1643
1644 assert_err!(parse_progress_record(b"{\"frontier\":null}"));
1645 }
1646}