1use std::cell::RefCell;
74use std::cmp::Ordering;
75use std::collections::BTreeMap;
76use std::future::Future;
77use std::rc::Rc;
78use std::sync::atomic::AtomicU64;
79use std::sync::{Arc, Weak};
80use std::time::Duration;
81
82use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
83use crate::metrics::sink::kafka::KafkaSinkMetrics;
84use crate::render::sinks::SinkRender;
85use crate::statistics::SinkStatistics;
86use crate::storage_state::StorageState;
87use anyhow::{Context, anyhow, bail};
88use differential_dataflow::{AsCollection, Collection, Hashable};
89use futures::StreamExt;
90use maplit::btreemap;
91use mz_expr::MirScalarExpr;
92use mz_interchange::avro::{AvroEncoder, DiffPair};
93use mz_interchange::encode::Encode;
94use mz_interchange::envelopes::dbz_format;
95use mz_interchange::json::JsonEncoder;
96use mz_interchange::text_binary::{BinaryEncoder, TextEncoder};
97use mz_kafka_util::admin::EnsureTopicConfig;
98use mz_kafka_util::client::{
99 DEFAULT_FETCH_METADATA_TIMEOUT, GetPartitionsError, MzClientContext, TimeoutConfig,
100 TunnelingClientContext,
101};
102use mz_ore::cast::CastFrom;
103use mz_ore::collections::CollectionExt;
104use mz_ore::error::ErrorExt;
105use mz_ore::future::InTask;
106use mz_ore::task::{self, AbortOnDropHandle};
107use mz_persist_client::Diagnostics;
108use mz_persist_client::write::WriteHandle;
109use mz_persist_types::codec_impls::UnitSchema;
110use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
111use mz_storage_client::sink::progress_key::ProgressKey;
112use mz_storage_types::StorageDiff;
113use mz_storage_types::configuration::StorageConfiguration;
114use mz_storage_types::controller::CollectionMetadata;
115use mz_storage_types::dyncfgs::{
116 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS, SINK_ENSURE_TOPIC_CONFIG, SINK_PROGRESS_SEARCH,
117};
118use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
119use mz_storage_types::sinks::{
120 KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, StorageSinkDesc,
121};
122use mz_storage_types::sources::SourceData;
123use mz_timely_util::antichain::AntichainExt;
124use mz_timely_util::builder_async::{
125 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
126};
127use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
128use rdkafka::error::KafkaError;
129use rdkafka::message::{Header, OwnedHeaders, ToBytes};
130use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
131use rdkafka::types::RDKafkaErrorCode;
132use rdkafka::{Message, Offset, Statistics, TopicPartitionList};
133use serde::{Deserialize, Deserializer, Serialize, Serializer};
134use timely::PartialOrder;
135use timely::dataflow::channels::pact::{Exchange, Pipeline};
136use timely::dataflow::operators::{CapabilitySet, Concatenate, Map, ToStream};
137use timely::dataflow::{Scope, Stream};
138use timely::progress::{Antichain, Timestamp as _};
139use tokio::sync::watch;
140use tokio::time::{self, MissedTickBehavior};
141use tracing::{debug, error, info, warn};
142
143impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for KafkaSinkConnection {
144 fn get_key_indices(&self) -> Option<&[usize]> {
145 self.key_desc_and_indices
146 .as_ref()
147 .map(|(_desc, indices)| indices.as_slice())
148 }
149
150 fn get_relation_key_indices(&self) -> Option<&[usize]> {
151 self.relation_key_indices.as_deref()
152 }
153
154 fn render_sink(
155 &self,
156 storage_state: &mut StorageState,
157 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
158 sink_id: GlobalId,
159 input: Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
160 _err_collection: Collection<G, DataflowError, Diff>,
163 ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
164 let mut scope = input.scope();
165
166 let write_handle = {
167 let persist = Arc::clone(&storage_state.persist_clients);
168 let shard_meta = sink.to_storage_metadata.clone();
169 async move {
170 let client = persist.open(shard_meta.persist_location).await?;
171 let handle = client
172 .open_writer(
173 shard_meta.data_shard,
174 Arc::new(shard_meta.relation_desc),
175 Arc::new(UnitSchema),
176 Diagnostics::from_purpose("sink handle"),
177 )
178 .await?;
179 Ok(handle)
180 }
181 };
182
183 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
184 storage_state
185 .sink_write_frontiers
186 .insert(sink_id, Rc::clone(&write_frontier));
187
188 let (encoded, encode_status, encode_token) = encode_collection(
189 format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
190 &input,
191 sink.envelope,
192 self.clone(),
193 storage_state.storage_configuration.clone(),
194 );
195
196 let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id);
197 let statistics = storage_state
198 .aggregated_statistics
199 .get_sink(&sink_id)
200 .expect("statistics initialized")
201 .clone();
202
203 let (sink_status, sink_token) = sink_collection(
204 format!("kafka-{sink_id}-sink"),
205 &encoded,
206 sink_id,
207 self.clone(),
208 storage_state.storage_configuration.clone(),
209 sink,
210 metrics,
211 statistics,
212 write_handle,
213 write_frontier,
214 );
215
216 let running_status = Some(HealthStatusMessage {
217 id: None,
218 update: HealthStatusUpdate::Running,
219 namespace: StatusNamespace::Kafka,
220 })
221 .to_stream(&mut scope);
222
223 let status = scope.concatenate([running_status, encode_status, sink_status]);
224
225 (status, vec![encode_token, sink_token])
226 }
227}
228
229struct TransactionalProducer {
230 task_name: String,
232 data_topic: String,
234 progress_topic: String,
236 progress_key: ProgressKey,
238 sink_version: u64,
240 partition_count: Arc<AtomicU64>,
242 _partition_count_task: AbortOnDropHandle<()>,
244 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
246 statistics: SinkStatistics,
248 staged_messages: u64,
251 staged_bytes: u64,
254 socket_timeout: Duration,
256 transaction_timeout: Duration,
258}
259
260impl TransactionalProducer {
261 async fn new(
265 sink_id: GlobalId,
266 connection: &KafkaSinkConnection,
267 storage_configuration: &StorageConfiguration,
268 metrics: Arc<KafkaSinkMetrics>,
269 statistics: SinkStatistics,
270 sink_version: u64,
271 ) -> Result<(Self, Antichain<mz_repr::Timestamp>), ContextCreationError> {
272 let client_id = connection.client_id(
273 storage_configuration.config_set(),
274 &storage_configuration.connection_context,
275 sink_id,
276 );
277 let transactional_id =
278 connection.transactional_id(&storage_configuration.connection_context, sink_id);
279
280 let timeout_config = &storage_configuration.parameters.kafka_timeout_config;
281 let mut options = BTreeMap::new();
282 options.insert("enable.idempotence", "true".into());
286 options.insert(
288 "compression.type",
289 connection.compression_type.to_librdkafka_option().into(),
290 );
291 options.insert("queue.buffering.max.kbytes", "2147483647".into());
294 options.insert("queue.buffering.max.messages", "0".into());
297 options.insert("queue.buffering.max.ms", format!("{}", 10));
299 options.insert(
301 "transaction.timeout.ms",
302 format!("{}", timeout_config.transaction_timeout.as_millis()),
303 );
304 options.insert("transactional.id", transactional_id);
306 options.insert("client.id", client_id);
308 options.insert("statistics.interval.ms", "1000".into());
310
311 let ctx = MzClientContext::default();
312
313 let stats_receiver = ctx.subscribe_statistics();
314 let task_name = format!("kafka_sink_metrics_collector:{sink_id}");
315 task::spawn(
316 || &task_name,
317 collect_statistics(stats_receiver, Arc::clone(&metrics)),
318 );
319
320 let producer: ThreadedProducer<_> = connection
321 .connection
322 .create_with_context(storage_configuration, ctx, &options, InTask::Yes)
323 .await?;
324
325 let partition_count = Arc::new(AtomicU64::new(0));
327 let update_partition_count = {
328 let partition_count = Arc::clone(&partition_count);
329 let metrics = Arc::clone(&metrics);
330 Arc::new(move |pc| {
331 partition_count.store(pc, std::sync::atomic::Ordering::SeqCst);
332 metrics.partition_count.set(pc);
333 })
334 };
335
336 let partition_count_task = task::spawn(
339 || format!("kafka_sink_producer_fetch_metadata_loop:{sink_id}"),
340 fetch_partition_count_loop(
341 producer.clone(),
342 sink_id,
343 connection.topic.clone(),
344 connection.topic_metadata_refresh_interval,
345 Arc::clone(&update_partition_count),
346 ),
347 );
348
349 let task_name = format!("kafka_sink_producer:{sink_id}");
350 let progress_key = ProgressKey::new(sink_id);
351
352 let producer = Self {
353 task_name,
354 data_topic: connection.topic.clone(),
355 partition_count,
356 _partition_count_task: partition_count_task.abort_on_drop(),
357 progress_topic: connection
358 .progress_topic(&storage_configuration.connection_context)
359 .into_owned(),
360 progress_key,
361 sink_version,
362 producer,
363 statistics,
364 staged_messages: 0,
365 staged_bytes: 0,
366 socket_timeout: timeout_config.socket_timeout,
367 transaction_timeout: timeout_config.transaction_timeout,
368 };
369
370 let timeout = timeout_config.socket_timeout;
371 producer
372 .spawn_blocking(move |p| p.init_transactions(timeout))
373 .await?;
374
375 let progress = determine_sink_progress(
378 sink_id,
379 connection,
380 storage_configuration,
381 Arc::clone(&metrics),
382 )
383 .await?;
384
385 let resume_upper = match progress {
386 Some(progress) => {
387 if sink_version < progress.version {
388 return Err(ContextCreationError::Other(anyhow!(
389 "Fenced off by newer version of the sink. ours={} theirs={}",
390 sink_version,
391 progress.version
392 )));
393 }
394 progress.frontier
395 }
396 None => {
397 mz_storage_client::sink::ensure_kafka_topic(
398 connection,
399 storage_configuration,
400 &connection.topic,
401 &connection.topic_options,
402 EnsureTopicConfig::Skip,
403 )
404 .await?;
405 Antichain::from_elem(Timestamp::minimum())
406 }
407 };
408
409 let partition_count =
414 fetch_partition_count(&producer.producer, sink_id, &connection.topic).await?;
415 update_partition_count(partition_count);
416
417 Ok((producer, resume_upper))
418 }
419
420 async fn spawn_blocking<F, R>(&self, f: F) -> Result<R, ContextCreationError>
423 where
424 F: FnOnce(
425 ThreadedProducer<TunnelingClientContext<MzClientContext>>,
426 ) -> Result<R, KafkaError>
427 + Send
428 + 'static,
429 R: Send + 'static,
430 {
431 let producer = self.producer.clone();
432 task::spawn_blocking(|| &self.task_name, move || f(producer))
433 .await
434 .unwrap()
435 .check_ssh_status(self.producer.context())
436 }
437
438 async fn begin_transaction(&self) -> Result<(), ContextCreationError> {
439 self.spawn_blocking(|p| p.begin_transaction()).await
440 }
441
442 fn send(
448 &mut self,
449 message: &KafkaMessage,
450 time: Timestamp,
451 diff: Diff,
452 ) -> Result<(), KafkaError> {
453 assert_eq!(diff, Diff::ONE, "invalid sink update");
454
455 let mut headers = OwnedHeaders::new().insert(Header {
456 key: "materialize-timestamp",
457 value: Some(time.to_string().as_bytes()),
458 });
459 for header in &message.headers {
460 if header.key.starts_with("materialize-") {
466 continue;
467 }
468
469 headers = headers.insert(Header {
470 key: header.key.as_str(),
471 value: header.value.as_ref(),
472 });
473 }
474
475 let pc = self
476 .partition_count
477 .load(std::sync::atomic::Ordering::SeqCst);
478 let partition = Some(i32::try_from(message.hash % pc).unwrap());
479
480 let record = BaseRecord {
481 topic: &self.data_topic,
482 key: message.key.as_ref(),
483 payload: message.value.as_ref(),
484 headers: Some(headers),
485 partition,
486 timestamp: None,
487 delivery_opaque: (),
488 };
489 let key_size = message.key.as_ref().map(|k| k.len()).unwrap_or(0);
490 let value_size = message.value.as_ref().map(|k| k.len()).unwrap_or(0);
491 let headers_size = message
492 .headers
493 .iter()
494 .map(|h| h.key.len() + h.value.as_ref().map(|v| v.len()).unwrap_or(0))
495 .sum::<usize>();
496 let record_size = u64::cast_from(key_size + value_size + headers_size);
497 self.statistics.inc_messages_staged_by(1);
498 self.staged_messages += 1;
499 self.statistics.inc_bytes_staged_by(record_size);
500 self.staged_bytes += record_size;
501 self.producer.send(record).map_err(|(e, _)| e)
502 }
503
504 async fn commit_transaction(
507 &mut self,
508 upper: Antichain<Timestamp>,
509 ) -> Result<(), ContextCreationError> {
510 let progress = ProgressRecord {
511 frontier: upper,
512 version: self.sink_version,
513 };
514 let payload = serde_json::to_vec(&progress).expect("infallible");
515 let record = BaseRecord::to(&self.progress_topic)
516 .payload(&payload)
517 .key(&self.progress_key);
518 self.producer.send(record).map_err(|(e, _)| e)?;
519
520 fail::fail_point!("kafka_sink_commit_transaction");
521
522 let timeout = self.transaction_timeout;
523 match self
524 .spawn_blocking(move |p| p.commit_transaction(timeout))
525 .await
526 {
527 Ok(()) => {
528 self.statistics
529 .inc_messages_committed_by(self.staged_messages);
530 self.statistics.inc_bytes_committed_by(self.staged_bytes);
531 self.staged_messages = 0;
532 self.staged_bytes = 0;
533 Ok(())
534 }
535 Err(ContextCreationError::KafkaError(KafkaError::Transaction(err))) => {
536 if err.txn_requires_abort() {
542 let timeout = self.socket_timeout;
543 self.spawn_blocking(move |p| p.abort_transaction(timeout))
544 .await?;
545 }
546 Err(ContextCreationError::KafkaError(KafkaError::Transaction(
547 err,
548 )))
549 }
550 Err(err) => Err(err),
551 }
552 }
553}
554
555async fn collect_statistics(
557 mut receiver: watch::Receiver<Statistics>,
558 metrics: Arc<KafkaSinkMetrics>,
559) {
560 let mut outbuf_cnt: i64 = 0;
561 let mut outbuf_msg_cnt: i64 = 0;
562 let mut waitresp_cnt: i64 = 0;
563 let mut waitresp_msg_cnt: i64 = 0;
564 let mut txerrs: u64 = 0;
565 let mut txretries: u64 = 0;
566 let mut req_timeouts: u64 = 0;
567 let mut connects: i64 = 0;
568 let mut disconnects: i64 = 0;
569 while receiver.changed().await.is_ok() {
570 let stats = receiver.borrow();
571 for broker in stats.brokers.values() {
572 outbuf_cnt += broker.outbuf_cnt;
573 outbuf_msg_cnt += broker.outbuf_msg_cnt;
574 waitresp_cnt += broker.waitresp_cnt;
575 waitresp_msg_cnt += broker.waitresp_msg_cnt;
576 txerrs += broker.txerrs;
577 txretries += broker.txretries;
578 req_timeouts += broker.req_timeouts;
579 connects += broker.connects.unwrap_or(0);
580 disconnects += broker.disconnects.unwrap_or(0);
581 }
582 metrics.rdkafka_msg_cnt.set(stats.msg_cnt);
583 metrics.rdkafka_msg_size.set(stats.msg_size);
584 metrics.rdkafka_txmsgs.set(stats.txmsgs);
585 metrics.rdkafka_txmsg_bytes.set(stats.txmsg_bytes);
586 metrics.rdkafka_tx.set(stats.tx);
587 metrics.rdkafka_tx_bytes.set(stats.tx_bytes);
588 metrics.rdkafka_outbuf_cnt.set(outbuf_cnt);
589 metrics.rdkafka_outbuf_msg_cnt.set(outbuf_msg_cnt);
590 metrics.rdkafka_waitresp_cnt.set(waitresp_cnt);
591 metrics.rdkafka_waitresp_msg_cnt.set(waitresp_msg_cnt);
592 metrics.rdkafka_txerrs.set(txerrs);
593 metrics.rdkafka_txretries.set(txretries);
594 metrics.rdkafka_req_timeouts.set(req_timeouts);
595 metrics.rdkafka_connects.set(connects);
596 metrics.rdkafka_disconnects.set(disconnects);
597 }
598}
599
600#[derive(Debug, Clone, Serialize, Deserialize)]
602struct KafkaMessage {
603 hash: u64,
605 key: Option<Vec<u8>>,
607 value: Option<Vec<u8>>,
609 headers: Vec<KafkaHeader>,
611}
612
613#[derive(Debug, Clone, Serialize, Deserialize)]
615struct KafkaHeader {
616 key: String,
618 value: Option<Vec<u8>>,
620}
621
622fn sink_collection<G: Scope<Timestamp = Timestamp>>(
628 name: String,
629 input: &Collection<G, KafkaMessage, Diff>,
630 sink_id: GlobalId,
631 connection: KafkaSinkConnection,
632 storage_configuration: StorageConfiguration,
633 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
634 metrics: KafkaSinkMetrics,
635 statistics: SinkStatistics,
636 write_handle: impl Future<
637 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
638 > + 'static,
639 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
640) -> (Stream<G, HealthStatusMessage>, PressOnDropButton) {
641 let scope = input.scope();
642 let mut builder = AsyncOperatorBuilder::new(name.clone(), input.inner.scope());
643
644 let hashed_id = sink_id.hashed();
646 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
647 let buffer_min_capacity =
648 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS.handle(storage_configuration.config_set());
649
650 let mut input = builder.new_disconnected_input(&input.inner, Exchange::new(move |_| hashed_id));
651
652 let as_of = sink.as_of.clone();
653 let sink_version = sink.version;
654 let (button, errors) = builder.build_fallible(move |_caps| {
655 Box::pin(async move {
656 if !is_active_worker {
657 write_frontier.borrow_mut().clear();
658 return Ok(());
659 }
660
661 fail::fail_point!("kafka_sink_creation_error", |_| Err(
662 ContextCreationError::Other(anyhow::anyhow!("synthetic error"))
663 ));
664
665 let mut write_handle = write_handle.await?;
666
667 let metrics = Arc::new(metrics);
668
669 let (mut producer, resume_upper) = TransactionalProducer::new(
670 sink_id,
671 &connection,
672 &storage_configuration,
673 Arc::clone(&metrics),
674 statistics,
675 sink_version,
676 )
677 .await?;
678
679 let overcompacted =
681 *resume_upper != [Timestamp::minimum()] &&
683 !PartialOrder::less_equal(&as_of, &resume_upper);
685 if overcompacted {
686 let err = format!(
687 "{name}: input compacted past resume upper: as_of {}, resume_upper: {}",
688 as_of.pretty(),
689 resume_upper.pretty()
690 );
691 error!("{err}");
695 return Err(anyhow!("{err}").into());
696 }
697
698 info!(
699 "{name}: as_of: {}, resume upper: {}",
700 as_of.pretty(),
701 resume_upper.pretty()
702 );
703
704 let Some(mut upper) = resume_upper.clone().into_option() else {
708 write_frontier.borrow_mut().clear();
709 return Ok(());
710 };
711
712 let mut deferred_updates = vec![];
713 let mut extra_updates = vec![];
714 let mut transaction_begun = false;
718 while let Some(event) = input.next().await {
719 match event {
720 Event::Data(_cap, batch) => {
721 for (message, time, diff) in batch {
722 match upper.cmp(&time) {
733 Ordering::Less => deferred_updates.push((message, time, diff)),
734 Ordering::Equal => {
735 if !transaction_begun {
736 producer.begin_transaction().await?;
737 transaction_begun = true;
738 }
739 producer.send(&message, time, diff)?;
740 }
741 Ordering::Greater => continue,
742 }
743 }
744 }
745 Event::Progress(progress) => {
746 if !PartialOrder::less_equal(&resume_upper, &progress) {
748 continue;
749 }
750 if !as_of.iter().all(|t| !progress.less_equal(t)) {
770 continue;
771 }
772 if !transaction_begun {
773 producer.begin_transaction().await?;
774 }
775
776 deferred_updates.shrink_to(buffer_min_capacity.get());
781 extra_updates.extend(
782 deferred_updates
783 .extract_if(.., |(_, time, _)| !progress.less_equal(time)),
784 );
785 extra_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
786
787 extra_updates.shrink_to(buffer_min_capacity.get());
789 for (message, time, diff) in extra_updates.drain(..) {
790 producer.send(&message, time, diff)?;
791 }
792
793 debug!("{name}: committing transaction for {}", progress.pretty());
794 producer.commit_transaction(progress.clone()).await?;
795 transaction_begun = false;
796 let mut expect_upper = write_handle.shared_upper();
797 loop {
798 if PartialOrder::less_equal(&progress, &expect_upper) {
799 break;
801 }
802 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
807 match write_handle
808 .compare_and_append(EMPTY, expect_upper, progress.clone())
809 .await
810 .expect("valid usage")
811 {
812 Ok(()) => break,
813 Err(mismatch) => {
814 expect_upper = mismatch.current;
815 }
816 }
817 }
818 write_frontier.borrow_mut().clone_from(&progress);
819 match progress.into_option() {
820 Some(new_upper) => upper = new_upper,
821 None => break,
822 }
823 }
824 }
825 }
826 Ok(())
827 })
828 });
829
830 let statuses = errors.map(|error: Rc<ContextCreationError>| {
831 let hint = match *error {
832 ContextCreationError::KafkaError(KafkaError::Transaction(ref e)) => {
833 if e.is_retriable() && e.code() == RDKafkaErrorCode::OperationTimedOut {
834 let hint = "If you're running a single Kafka broker, ensure that the configs \
835 transaction.state.log.replication.factor, transaction.state.log.min.isr, \
836 and offsets.topic.replication.factor are set to 1 on the broker";
837 Some(hint.to_owned())
838 } else {
839 None
840 }
841 }
842 _ => None,
843 };
844
845 HealthStatusMessage {
846 id: None,
847 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint),
848 namespace: if matches!(*error, ContextCreationError::Ssh(_)) {
849 StatusNamespace::Ssh
850 } else {
851 StatusNamespace::Kafka
852 },
853 }
854 });
855
856 (statuses, button.press_on_drop())
857}
858
859async fn determine_sink_progress(
866 sink_id: GlobalId,
867 connection: &KafkaSinkConnection,
868 storage_configuration: &StorageConfiguration,
869 metrics: Arc<KafkaSinkMetrics>,
870) -> Result<Option<ProgressRecord>, ContextCreationError> {
871 let TimeoutConfig {
878 fetch_metadata_timeout,
879 progress_record_fetch_timeout,
880 ..
881 } = storage_configuration.parameters.kafka_timeout_config;
882
883 let client_id = connection.client_id(
884 storage_configuration.config_set(),
885 &storage_configuration.connection_context,
886 sink_id,
887 );
888 let group_id = connection.progress_group_id(&storage_configuration.connection_context, sink_id);
889 let progress_topic = connection
890 .progress_topic(&storage_configuration.connection_context)
891 .into_owned();
892 let progress_topic_options = &connection.connection.progress_topic_options;
893 let progress_key = ProgressKey::new(sink_id);
894
895 let common_options = btreemap! {
896 "group.id" => group_id,
899 "client.id" => client_id,
901 "enable.auto.commit" => "false".into(),
902 "auto.offset.reset" => "earliest".into(),
903 "enable.partition.eof" => "true".into(),
906 };
907
908 let progress_client_read_committed: BaseConsumer<_> = {
911 let mut opts = common_options.clone();
912 opts.insert("isolation.level", "read_committed".into());
913 let ctx = MzClientContext::default();
914 connection
915 .connection
916 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
917 .await?
918 };
919
920 let progress_client_read_uncommitted: BaseConsumer<_> = {
921 let mut opts = common_options;
922 opts.insert("isolation.level", "read_uncommitted".into());
923 let ctx = MzClientContext::default();
924 connection
925 .connection
926 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
927 .await?
928 };
929
930 let ctx = Arc::clone(progress_client_read_committed.client().context());
931
932 let ensure_topic_config =
934 match &*SINK_ENSURE_TOPIC_CONFIG.get(storage_configuration.config_set()) {
935 "skip" => EnsureTopicConfig::Skip,
936 "check" => EnsureTopicConfig::Check,
937 "alter" => EnsureTopicConfig::Alter,
938 _ => {
939 tracing::warn!(
940 topic = progress_topic,
941 "unexpected value for ensure-topic-config; skipping checks"
942 );
943 EnsureTopicConfig::Skip
944 }
945 };
946 mz_storage_client::sink::ensure_kafka_topic(
947 connection,
948 storage_configuration,
949 &progress_topic,
950 progress_topic_options,
951 ensure_topic_config,
952 )
953 .await
954 .add_context("error registering kafka progress topic for sink")?;
955
956 let parent_token = Arc::new(());
963 let child_token = Arc::downgrade(&parent_token);
964 let task_name = format!("get_latest_ts:{sink_id}");
965 let sink_progress_search = SINK_PROGRESS_SEARCH.get(storage_configuration.config_set());
966 let result = task::spawn_blocking(|| task_name, move || {
967 let progress_topic = progress_topic.as_ref();
968 let partitions = match mz_kafka_util::client::get_partitions(
972 progress_client_read_committed.client(),
973 progress_topic,
974 fetch_metadata_timeout,
975 ) {
976 Ok(partitions) => partitions,
977 Err(GetPartitionsError::TopicDoesNotExist) => {
978 return Ok(None);
981 }
982 e => e.with_context(|| {
983 format!(
984 "Unable to fetch metadata about progress topic {}",
985 progress_topic
986 )
987 })?,
988 };
989 if partitions.len() != 1 {
990 bail!(
991 "Progress topic {} should contain a single partition, but instead contains {} partitions",
992 progress_topic, partitions.len(),
993 );
994 }
995 let partition = partitions.into_element();
996
997 metrics.consumed_progress_records.set(0);
1006
1007 let (lo, hi) = progress_client_read_uncommitted
1033 .fetch_watermarks(progress_topic, partition, fetch_metadata_timeout)
1034 .map_err(|e| {
1035 anyhow!(
1036 "Failed to fetch metadata while reading from progress topic: {}",
1037 e
1038 )
1039 })?;
1040
1041 let mut start_indices = vec![lo];
1047 if sink_progress_search {
1048 let mut lookback = hi.saturating_sub(lo) / 10;
1049 while lookback >= 20_000 {
1050 start_indices.push(hi - lookback);
1051 lookback /= 10;
1052 }
1053 }
1054 for lo in start_indices.into_iter().rev() {
1055 if let Some(found) = progress_search(
1056 &progress_client_read_committed,
1057 progress_record_fetch_timeout,
1058 progress_topic,
1059 partition,
1060 lo,
1061 hi,
1062 progress_key.clone(),
1063 Weak::clone(&child_token),
1064 Arc::clone(&metrics)
1065 )? {
1066 return Ok(Some(found));
1067 }
1068 }
1069 Ok(None)
1070 }).await.unwrap().check_ssh_status(&ctx);
1071 drop(parent_token);
1073 result
1074}
1075
1076fn progress_search<C: ConsumerContext + 'static>(
1077 progress_client_read_committed: &BaseConsumer<C>,
1078 progress_record_fetch_timeout: Duration,
1079 progress_topic: &str,
1080 partition: i32,
1081 lo: i64,
1082 hi: i64,
1083 progress_key: ProgressKey,
1084 child_token: Weak<()>,
1085 metrics: Arc<KafkaSinkMetrics>,
1086) -> anyhow::Result<Option<ProgressRecord>> {
1087 let mut tps = TopicPartitionList::new();
1089 tps.add_partition(progress_topic, partition);
1090 tps.set_partition_offset(progress_topic, partition, Offset::Offset(lo))?;
1091 progress_client_read_committed
1092 .assign(&tps)
1093 .with_context(|| {
1094 format!(
1095 "Error seeking in progress topic {}:{}",
1096 progress_topic, partition
1097 )
1098 })?;
1099
1100 let get_position = || {
1102 if child_token.strong_count() == 0 {
1103 bail!("operation cancelled");
1104 }
1105 let position = progress_client_read_committed
1106 .position()?
1107 .find_partition(progress_topic, partition)
1108 .ok_or_else(|| {
1109 anyhow!(
1110 "No position info found for progress topic {}",
1111 progress_topic
1112 )
1113 })?
1114 .offset();
1115 let position = match position {
1116 Offset::Offset(position) => position,
1117 Offset::Invalid => lo,
1127 _ => bail!(
1128 "Consumer::position returned offset of wrong type: {:?}",
1129 position
1130 ),
1131 };
1132 let outstanding = u64::try_from(std::cmp::max(0, hi - position)).unwrap();
1134 metrics.outstanding_progress_records.set(outstanding);
1135 Ok(position)
1136 };
1137
1138 info!("fetching latest progress record for {progress_key}, lo/hi: {lo}/{hi}");
1139
1140 let mut last_progress: Option<ProgressRecord> = None;
1164 loop {
1165 let current_position = get_position()?;
1166
1167 if current_position >= hi {
1168 break;
1170 }
1171
1172 let message = match progress_client_read_committed.poll(progress_record_fetch_timeout) {
1173 Some(Ok(message)) => message,
1174 Some(Err(KafkaError::PartitionEOF(_))) => {
1175 continue;
1180 }
1181 Some(Err(e)) => bail!("failed to fetch progress message {e}"),
1182 None => {
1183 bail!(
1184 "timed out while waiting to reach high water mark of non-empty \
1185 topic {progress_topic}:{partition}, lo/hi: {lo}/{hi}, current position: {current_position}"
1186 );
1187 }
1188 };
1189
1190 if message.key() != Some(progress_key.to_bytes()) {
1191 continue;
1193 }
1194
1195 metrics.consumed_progress_records.inc();
1196
1197 let Some(payload) = message.payload() else {
1198 continue;
1199 };
1200 let progress = parse_progress_record(payload)?;
1201
1202 match last_progress {
1203 Some(last_progress)
1204 if !PartialOrder::less_equal(&last_progress.frontier, &progress.frontier) =>
1205 {
1206 bail!(
1207 "upper regressed in topic {progress_topic}:{partition} from {:?} to {:?}",
1208 &last_progress.frontier,
1209 &progress.frontier,
1210 );
1211 }
1212 _ => last_progress = Some(progress),
1213 }
1214 }
1215
1216 Ok(last_progress)
1220}
1221
1222#[derive(Debug, PartialEq, Serialize, Deserialize)]
1229pub struct LegacyProgressRecord {
1230 #[serde(default, deserialize_with = "deserialize_some")]
1233 pub timestamp: Option<Option<Timestamp>>,
1234}
1235
1236fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
1238where
1239 T: Deserialize<'de>,
1240 D: Deserializer<'de>,
1241{
1242 Deserialize::deserialize(deserializer).map(Some)
1243}
1244
1245#[derive(Debug, PartialEq, Serialize, Deserialize)]
1248pub struct ProgressRecord {
1249 #[serde(
1250 deserialize_with = "deserialize_frontier",
1251 serialize_with = "serialize_frontier"
1252 )]
1253 pub frontier: Antichain<Timestamp>,
1254 #[serde(default)]
1255 pub version: u64,
1256}
1257fn serialize_frontier<S>(frontier: &Antichain<Timestamp>, serializer: S) -> Result<S::Ok, S::Error>
1258where
1259 S: Serializer,
1260{
1261 Serialize::serialize(frontier.elements(), serializer)
1262}
1263
1264fn deserialize_frontier<'de, D>(deserializer: D) -> Result<Antichain<Timestamp>, D::Error>
1265where
1266 D: Deserializer<'de>,
1267{
1268 let times: Vec<Timestamp> = Deserialize::deserialize(deserializer)?;
1269 Ok(Antichain::from(times))
1270}
1271
1272fn parse_progress_record(payload: &[u8]) -> Result<ProgressRecord, anyhow::Error> {
1273 Ok(match serde_json::from_slice::<ProgressRecord>(payload) {
1274 Ok(progress) => progress,
1275 Err(_) => match serde_json::from_slice::<LegacyProgressRecord>(payload) {
1277 Ok(LegacyProgressRecord {
1278 timestamp: Some(Some(time)),
1279 }) => ProgressRecord {
1280 frontier: Antichain::from_elem(time.step_forward()),
1281 version: 0,
1282 },
1283 Ok(LegacyProgressRecord {
1284 timestamp: Some(None),
1285 }) => ProgressRecord {
1286 frontier: Antichain::new(),
1287 version: 0,
1288 },
1289 _ => match std::str::from_utf8(payload) {
1290 Ok(payload) => bail!("invalid progress record: {payload}"),
1291 Err(_) => bail!("invalid progress record bytes: {payload:?}"),
1292 },
1293 },
1294 })
1295}
1296
1297async fn fetch_partition_count(
1299 producer: &ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1300 sink_id: GlobalId,
1301 topic_name: &str,
1302) -> Result<u64, anyhow::Error> {
1303 let meta = task::spawn_blocking(|| format!("kafka_sink_fetch_partition_count:{sink_id}"), {
1304 let producer = producer.clone();
1305 move || {
1306 producer
1307 .client()
1308 .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
1309 }
1310 })
1311 .await
1312 .expect("spawning blocking task cannot fail")
1313 .check_ssh_status(producer.context())?;
1314
1315 match meta.topics().iter().find(|t| t.name() == topic_name) {
1316 Some(topic) => {
1317 let partition_count = u64::cast_from(topic.partitions().len());
1318 if partition_count == 0 {
1319 bail!("topic {topic_name} has an impossible partition count of zero");
1320 }
1321 Ok(partition_count)
1322 }
1323 None => bail!("topic {topic_name} does not exist"),
1324 }
1325}
1326
1327async fn fetch_partition_count_loop<F>(
1333 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1334 sink_id: GlobalId,
1335 topic_name: String,
1336 interval: Duration,
1337 update_partition_count: Arc<F>,
1338) where
1339 F: Fn(u64),
1340{
1341 let mut interval = time::interval(interval);
1342 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
1343 loop {
1344 interval.tick().await;
1345 match fetch_partition_count(&producer, sink_id, &topic_name).await {
1346 Ok(pc) => update_partition_count(pc),
1347 Err(e) => {
1348 warn!(%sink_id, "failed updating partition count: {e}");
1349 continue;
1350 }
1351 };
1352 }
1353}
1354
1355fn encode_collection<G: Scope>(
1359 name: String,
1360 input: &Collection<G, (Option<Row>, DiffPair<Row>), Diff>,
1361 envelope: SinkEnvelope,
1362 connection: KafkaSinkConnection,
1363 storage_configuration: StorageConfiguration,
1364) -> (
1365 Collection<G, KafkaMessage, Diff>,
1366 Stream<G, HealthStatusMessage>,
1367 PressOnDropButton,
1368) {
1369 let mut builder = AsyncOperatorBuilder::new(name, input.inner.scope());
1370
1371 let (output, stream) = builder.new_output();
1372 let mut input = builder.new_input_for(&input.inner, Pipeline, &output);
1373
1374 let (button, errors) = builder.build_fallible(move |caps| {
1375 Box::pin(async move {
1376 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1377 let key_desc = connection
1378 .key_desc_and_indices
1379 .as_ref()
1380 .map(|(desc, _indices)| desc.clone());
1381 let value_desc = connection.value_desc;
1382
1383 let key_encoder: Option<Box<dyn Encode>> =
1384 match (key_desc, connection.format.key_format) {
1385 (Some(desc), Some(KafkaSinkFormatType::Bytes)) => {
1386 Some(Box::new(BinaryEncoder::new(desc, false)))
1387 }
1388 (Some(desc), Some(KafkaSinkFormatType::Text)) => {
1389 Some(Box::new(TextEncoder::new(desc, false)))
1390 }
1391 (Some(desc), Some(KafkaSinkFormatType::Json)) => {
1392 Some(Box::new(JsonEncoder::new(desc, false)))
1393 }
1394 (Some(desc), Some(KafkaSinkFormatType::Avro {
1395 schema,
1396 compatibility_level,
1397 csr_connection,
1398 })) => {
1399 let ccsr = csr_connection
1404 .connect(&storage_configuration, InTask::Yes)
1405 .await?;
1406
1407 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1408 ccsr,
1409 format!("{}-key", connection.topic),
1410 schema.clone(),
1411 mz_ccsr::SchemaType::Avro,
1412 compatibility_level,
1413 )
1414 .await
1415 .context("error publishing kafka schemas for sink")?;
1416
1417 Some(Box::new(AvroEncoder::new(desc, false, &schema, schema_id)))
1418 }
1419 (None, None) => None,
1420 (desc, format) => {
1421 return Err(anyhow!(
1422 "key_desc and key_format must be both set or both unset, but key_desc: {:?}, key_format: {:?}",
1423 desc,
1424 format
1425 ))
1426 }
1427 };
1428
1429 let debezium = matches!(envelope, SinkEnvelope::Debezium);
1431
1432 let value_encoder: Box<dyn Encode> = match connection.format.value_format {
1433 KafkaSinkFormatType::Bytes => Box::new(BinaryEncoder::new(value_desc, debezium)),
1434 KafkaSinkFormatType::Text => Box::new(TextEncoder::new(value_desc, debezium)),
1435 KafkaSinkFormatType::Json => Box::new(JsonEncoder::new(value_desc, debezium)),
1436 KafkaSinkFormatType::Avro {
1437 schema,
1438 compatibility_level,
1439 csr_connection,
1440 } => {
1441 let ccsr = csr_connection
1446 .connect(&storage_configuration, InTask::Yes)
1447 .await?;
1448
1449 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1450 ccsr,
1451 format!("{}-value", connection.topic),
1452 schema.clone(),
1453 mz_ccsr::SchemaType::Avro,
1454 compatibility_level,
1455 )
1456 .await
1457 .context("error publishing kafka schemas for sink")?;
1458
1459 Box::new(AvroEncoder::new(value_desc, debezium, &schema, schema_id))
1460 }
1461 };
1462
1463 *capset = CapabilitySet::new();
1469
1470 let mut row_buf = Row::default();
1471 let mut datums = DatumVec::new();
1472
1473 while let Some(event) = input.next().await {
1474 if let Event::Data(cap, rows) = event {
1475 for ((key, value), time, diff) in rows {
1476 let mut hash = None;
1477 let mut headers = vec![];
1478 if connection.headers_index.is_some() || connection.partition_by.is_some() {
1479 let row = value
1492 .after
1493 .as_ref()
1494 .or(value.before.as_ref())
1495 .expect("one of before or after must be set");
1496 let row = datums.borrow_with(row);
1497
1498 if let Some(i) = connection.headers_index {
1499 headers = encode_headers(row[i]);
1500 }
1501
1502 if let Some(partition_by) = &connection.partition_by {
1503 hash = Some(evaluate_partition_by(partition_by, &row));
1504 }
1505 }
1506 let (key, hash) = match key {
1507 Some(key) => {
1508 let key_encoder = key_encoder.as_ref().expect("key present");
1509 let key = key_encoder.encode_unchecked(key);
1510 let hash = hash.unwrap_or_else(|| key_encoder.hash(&key));
1511 (Some(key), hash)
1512 }
1513 None => (None, hash.unwrap_or(0))
1514 };
1515 let value = match envelope {
1516 SinkEnvelope::Upsert => value.after,
1517 SinkEnvelope::Debezium => {
1518 dbz_format(&mut row_buf.packer(), value);
1519 Some(row_buf.clone())
1520 }
1521 };
1522 let value = value.map(|value| value_encoder.encode_unchecked(value));
1523 let message = KafkaMessage {
1524 hash,
1525 key,
1526 value,
1527 headers,
1528 };
1529 output.give(&cap, (message, time, diff));
1530 }
1531 }
1532 }
1533 Ok::<(), anyhow::Error>(())
1534 })
1535 });
1536
1537 let statuses = errors.map(|error| HealthStatusMessage {
1538 id: None,
1539 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1540 namespace: StatusNamespace::Kafka,
1541 });
1542
1543 (stream.as_collection(), statuses, button.press_on_drop())
1544}
1545
1546fn encode_headers(datum: Datum) -> Vec<KafkaHeader> {
1547 let mut out = vec![];
1548 if datum.is_null() {
1549 return out;
1550 }
1551 for (key, value) in datum.unwrap_map().iter() {
1552 out.push(KafkaHeader {
1553 key: key.into(),
1554 value: match value {
1555 Datum::Null => None,
1556 Datum::String(s) => Some(s.as_bytes().to_vec()),
1557 Datum::Bytes(b) => Some(b.to_vec()),
1558 _ => panic!("encode_headers called with unexpected header value {value:?}"),
1559 },
1560 })
1561 }
1562 out
1563}
1564
1565fn evaluate_partition_by(partition_by: &MirScalarExpr, row: &[Datum]) -> u64 {
1573 let temp_storage = RowArena::new();
1581 match partition_by.eval(row, &temp_storage) {
1582 Ok(hash) => match hash {
1583 Datum::Int32(i) => i.try_into().unwrap_or(0),
1584 Datum::Int64(i) => i.try_into().unwrap_or(0),
1585 Datum::UInt32(u) => u64::from(u),
1586 Datum::UInt64(u) => u,
1587 _ => unreachable!(),
1588 },
1589 Err(_) => 0,
1590 }
1591}
1592
1593#[cfg(test)]
1594mod test {
1595 use mz_ore::assert_err;
1596
1597 use super::*;
1598
1599 #[mz_ore::test]
1600 fn progress_record_migration() {
1601 assert_err!(parse_progress_record(b"{}"));
1602
1603 assert_eq!(
1604 parse_progress_record(b"{\"timestamp\":1}").unwrap(),
1605 ProgressRecord {
1606 frontier: Antichain::from_elem(2.into()),
1607 version: 0,
1608 }
1609 );
1610
1611 assert_eq!(
1612 parse_progress_record(b"{\"timestamp\":null}").unwrap(),
1613 ProgressRecord {
1614 frontier: Antichain::new(),
1615 version: 0,
1616 }
1617 );
1618
1619 assert_eq!(
1620 parse_progress_record(b"{\"frontier\":[1]}").unwrap(),
1621 ProgressRecord {
1622 frontier: Antichain::from_elem(1.into()),
1623 version: 0,
1624 }
1625 );
1626
1627 assert_eq!(
1628 parse_progress_record(b"{\"frontier\":[]}").unwrap(),
1629 ProgressRecord {
1630 frontier: Antichain::new(),
1631 version: 0,
1632 }
1633 );
1634
1635 assert_eq!(
1636 parse_progress_record(b"{\"frontier\":[], \"version\": 42}").unwrap(),
1637 ProgressRecord {
1638 frontier: Antichain::new(),
1639 version: 42,
1640 }
1641 );
1642
1643 assert_err!(parse_progress_record(b"{\"frontier\":null}"));
1644 }
1645}