1use std::cell::RefCell;
74use std::cmp::Ordering;
75use std::collections::BTreeMap;
76use std::future::Future;
77use std::rc::Rc;
78use std::sync::atomic::AtomicU64;
79use std::sync::{Arc, Weak};
80use std::time::Duration;
81
82use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
83use crate::metrics::sink::kafka::KafkaSinkMetrics;
84use crate::render::sinks::SinkRender;
85use crate::statistics::SinkStatistics;
86use crate::storage_state::StorageState;
87use anyhow::{Context, anyhow, bail};
88use differential_dataflow::{AsCollection, Hashable, VecCollection};
89use futures::StreamExt;
90use maplit::btreemap;
91use mz_expr::MirScalarExpr;
92use mz_interchange::avro::{AvroEncoder, DiffPair};
93use mz_interchange::encode::Encode;
94use mz_interchange::envelopes::dbz_format;
95use mz_interchange::json::JsonEncoder;
96use mz_interchange::text_binary::{BinaryEncoder, TextEncoder};
97use mz_kafka_util::admin::EnsureTopicConfig;
98use mz_kafka_util::client::{
99 DEFAULT_FETCH_METADATA_TIMEOUT, GetPartitionsError, MzClientContext, TimeoutConfig,
100 TunnelingClientContext,
101};
102use mz_ore::cast::CastFrom;
103use mz_ore::collections::CollectionExt;
104use mz_ore::error::ErrorExt;
105use mz_ore::future::InTask;
106use mz_ore::soft_assert_or_log;
107use mz_ore::task::{self, AbortOnDropHandle};
108use mz_persist_client::Diagnostics;
109use mz_persist_client::write::WriteHandle;
110use mz_persist_types::codec_impls::UnitSchema;
111use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
112use mz_storage_client::sink::progress_key::ProgressKey;
113use mz_storage_types::StorageDiff;
114use mz_storage_types::configuration::StorageConfiguration;
115use mz_storage_types::controller::CollectionMetadata;
116use mz_storage_types::dyncfgs::{
117 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS, SINK_ENSURE_TOPIC_CONFIG, SINK_PROGRESS_SEARCH,
118};
119use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
120use mz_storage_types::sinks::{
121 KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, StorageSinkDesc,
122};
123use mz_storage_types::sources::SourceData;
124use mz_timely_util::antichain::AntichainExt;
125use mz_timely_util::builder_async::{
126 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
127};
128use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
129use rdkafka::error::KafkaError;
130use rdkafka::message::{Header, OwnedHeaders, ToBytes};
131use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
132use rdkafka::types::RDKafkaErrorCode;
133use rdkafka::{Message, Offset, Statistics, TopicPartitionList};
134use serde::{Deserialize, Deserializer, Serialize, Serializer};
135use timely::PartialOrder;
136use timely::container::CapacityContainerBuilder;
137use timely::dataflow::channels::pact::{Exchange, Pipeline};
138use timely::dataflow::operators::{CapabilitySet, Concatenate, Map, ToStream};
139use timely::dataflow::{Scope, Stream};
140use timely::progress::{Antichain, Timestamp as _};
141use tokio::sync::watch;
142use tokio::time::{self, MissedTickBehavior};
143use tracing::{debug, error, info, warn};
144
145impl<G: Scope<Timestamp = Timestamp>> SinkRender<G> for KafkaSinkConnection {
146 fn get_key_indices(&self) -> Option<&[usize]> {
147 self.key_desc_and_indices
148 .as_ref()
149 .map(|(_desc, indices)| indices.as_slice())
150 }
151
152 fn get_relation_key_indices(&self) -> Option<&[usize]> {
153 self.relation_key_indices.as_deref()
154 }
155
156 fn render_sink(
157 &self,
158 storage_state: &mut StorageState,
159 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
160 sink_id: GlobalId,
161 input: VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
162 _err_collection: VecCollection<G, DataflowError, Diff>,
165 ) -> (Stream<G, HealthStatusMessage>, Vec<PressOnDropButton>) {
166 let mut scope = input.scope();
167
168 let write_handle = {
169 let persist = Arc::clone(&storage_state.persist_clients);
170 let shard_meta = sink.to_storage_metadata.clone();
171 async move {
172 let client = persist.open(shard_meta.persist_location).await?;
173 let handle = client
174 .open_writer(
175 shard_meta.data_shard,
176 Arc::new(shard_meta.relation_desc),
177 Arc::new(UnitSchema),
178 Diagnostics::from_purpose("sink handle"),
179 )
180 .await?;
181 Ok(handle)
182 }
183 };
184
185 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
186 storage_state
187 .sink_write_frontiers
188 .insert(sink_id, Rc::clone(&write_frontier));
189
190 let (encoded, encode_status, encode_token) = encode_collection(
191 format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
192 &input,
193 sink.envelope,
194 self.clone(),
195 storage_state.storage_configuration.clone(),
196 );
197
198 let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id);
199 let statistics = storage_state
200 .aggregated_statistics
201 .get_sink(&sink_id)
202 .expect("statistics initialized")
203 .clone();
204
205 let (sink_status, sink_token) = sink_collection(
206 format!("kafka-{sink_id}-sink"),
207 &encoded,
208 sink_id,
209 self.clone(),
210 storage_state.storage_configuration.clone(),
211 sink,
212 metrics,
213 statistics,
214 write_handle,
215 write_frontier,
216 );
217
218 let running_status = Some(HealthStatusMessage {
219 id: None,
220 update: HealthStatusUpdate::Running,
221 namespace: StatusNamespace::Kafka,
222 })
223 .to_stream(&mut scope);
224
225 let status = scope.concatenate([running_status, encode_status, sink_status]);
226
227 (status, vec![encode_token, sink_token])
228 }
229}
230
231struct TransactionalProducer {
232 task_name: String,
234 data_topic: String,
236 progress_topic: String,
238 progress_key: ProgressKey,
240 sink_version: u64,
242 partition_count: Arc<AtomicU64>,
244 _partition_count_task: AbortOnDropHandle<()>,
246 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
248 statistics: SinkStatistics,
250 staged_messages: u64,
253 staged_bytes: u64,
256 socket_timeout: Duration,
258 transaction_timeout: Duration,
260}
261
262impl TransactionalProducer {
263 async fn new(
267 sink_id: GlobalId,
268 connection: &KafkaSinkConnection,
269 storage_configuration: &StorageConfiguration,
270 metrics: Arc<KafkaSinkMetrics>,
271 statistics: SinkStatistics,
272 sink_version: u64,
273 ) -> Result<(Self, Antichain<mz_repr::Timestamp>), ContextCreationError> {
274 let client_id = connection.client_id(
275 storage_configuration.config_set(),
276 &storage_configuration.connection_context,
277 sink_id,
278 );
279 let transactional_id =
280 connection.transactional_id(&storage_configuration.connection_context, sink_id);
281
282 let timeout_config = &storage_configuration.parameters.kafka_timeout_config;
283 let mut options = BTreeMap::new();
284 options.insert("enable.idempotence", "true".into());
288 options.insert(
290 "compression.type",
291 connection.compression_type.to_librdkafka_option().into(),
292 );
293 options.insert("queue.buffering.max.kbytes", "2147483647".into());
296 options.insert("queue.buffering.max.messages", "0".into());
299 options.insert("queue.buffering.max.ms", format!("{}", 10));
301 options.insert(
303 "transaction.timeout.ms",
304 format!("{}", timeout_config.transaction_timeout.as_millis()),
305 );
306 options.insert("transactional.id", transactional_id);
308 options.insert("client.id", client_id);
310 options.insert("statistics.interval.ms", "1000".into());
312
313 let ctx = MzClientContext::default();
314
315 let stats_receiver = ctx.subscribe_statistics();
316 let task_name = format!("kafka_sink_metrics_collector:{sink_id}");
317 task::spawn(
318 || &task_name,
319 collect_statistics(stats_receiver, Arc::clone(&metrics)),
320 );
321
322 let producer: ThreadedProducer<_> = connection
323 .connection
324 .create_with_context(storage_configuration, ctx, &options, InTask::Yes)
325 .await?;
326
327 let partition_count = Arc::new(AtomicU64::new(0));
329 let update_partition_count = {
330 let partition_count = Arc::clone(&partition_count);
331 let metrics = Arc::clone(&metrics);
332 Arc::new(move |pc| {
333 partition_count.store(pc, std::sync::atomic::Ordering::SeqCst);
334 metrics.partition_count.set(pc);
335 })
336 };
337
338 let partition_count_task = task::spawn(
341 || format!("kafka_sink_producer_fetch_metadata_loop:{sink_id}"),
342 fetch_partition_count_loop(
343 producer.clone(),
344 sink_id,
345 connection.topic.clone(),
346 connection.topic_metadata_refresh_interval,
347 Arc::clone(&update_partition_count),
348 ),
349 );
350
351 let task_name = format!("kafka_sink_producer:{sink_id}");
352 let progress_key = ProgressKey::new(sink_id);
353
354 let producer = Self {
355 task_name,
356 data_topic: connection.topic.clone(),
357 partition_count,
358 _partition_count_task: partition_count_task.abort_on_drop(),
359 progress_topic: connection
360 .progress_topic(&storage_configuration.connection_context)
361 .into_owned(),
362 progress_key,
363 sink_version,
364 producer,
365 statistics,
366 staged_messages: 0,
367 staged_bytes: 0,
368 socket_timeout: timeout_config.socket_timeout,
369 transaction_timeout: timeout_config.transaction_timeout,
370 };
371
372 let timeout = timeout_config.socket_timeout;
373 producer
374 .spawn_blocking(move |p| p.init_transactions(timeout))
375 .await?;
376
377 let progress = determine_sink_progress(
380 sink_id,
381 connection,
382 storage_configuration,
383 Arc::clone(&metrics),
384 )
385 .await?;
386
387 let resume_upper = match progress {
388 Some(progress) => {
389 if sink_version < progress.version {
390 return Err(ContextCreationError::Other(anyhow!(
391 "Fenced off by newer version of the sink. ours={} theirs={}",
392 sink_version,
393 progress.version
394 )));
395 }
396 progress.frontier
397 }
398 None => {
399 mz_storage_client::sink::ensure_kafka_topic(
400 connection,
401 storage_configuration,
402 &connection.topic,
403 &connection.topic_options,
404 EnsureTopicConfig::Skip,
405 )
406 .await?;
407 Antichain::from_elem(Timestamp::minimum())
408 }
409 };
410
411 let partition_count =
416 fetch_partition_count(&producer.producer, sink_id, &connection.topic).await?;
417 update_partition_count(partition_count);
418
419 Ok((producer, resume_upper))
420 }
421
422 async fn spawn_blocking<F, R>(&self, f: F) -> Result<R, ContextCreationError>
425 where
426 F: FnOnce(
427 ThreadedProducer<TunnelingClientContext<MzClientContext>>,
428 ) -> Result<R, KafkaError>
429 + Send
430 + 'static,
431 R: Send + 'static,
432 {
433 let producer = self.producer.clone();
434 task::spawn_blocking(|| &self.task_name, move || f(producer))
435 .await
436 .unwrap()
437 .check_ssh_status(self.producer.context())
438 }
439
440 async fn begin_transaction(&self) -> Result<(), ContextCreationError> {
441 self.spawn_blocking(|p| p.begin_transaction()).await
442 }
443
444 fn send(
450 &mut self,
451 message: &KafkaMessage,
452 time: Timestamp,
453 diff: Diff,
454 ) -> Result<(), KafkaError> {
455 assert_eq!(diff, Diff::ONE, "invalid sink update");
456
457 let mut headers = OwnedHeaders::new().insert(Header {
458 key: "materialize-timestamp",
459 value: Some(time.to_string().as_bytes()),
460 });
461 for header in &message.headers {
462 if header.key.starts_with("materialize-") {
468 continue;
469 }
470
471 headers = headers.insert(Header {
472 key: header.key.as_str(),
473 value: header.value.as_ref(),
474 });
475 }
476
477 let pc = self
478 .partition_count
479 .load(std::sync::atomic::Ordering::SeqCst);
480 let partition = Some(i32::try_from(message.hash % pc).unwrap());
481
482 let record = BaseRecord {
483 topic: &self.data_topic,
484 key: message.key.as_ref(),
485 payload: message.value.as_ref(),
486 headers: Some(headers),
487 partition,
488 timestamp: None,
489 delivery_opaque: (),
490 };
491 let key_size = message.key.as_ref().map(|k| k.len()).unwrap_or(0);
492 let value_size = message.value.as_ref().map(|k| k.len()).unwrap_or(0);
493 let headers_size = message
494 .headers
495 .iter()
496 .map(|h| h.key.len() + h.value.as_ref().map(|v| v.len()).unwrap_or(0))
497 .sum::<usize>();
498 let record_size = u64::cast_from(key_size + value_size + headers_size);
499 self.statistics.inc_messages_staged_by(1);
500 self.staged_messages += 1;
501 self.statistics.inc_bytes_staged_by(record_size);
502 self.staged_bytes += record_size;
503 self.producer.send(record).map_err(|(e, _)| e)
504 }
505
506 async fn commit_transaction(
509 &mut self,
510 upper: Antichain<Timestamp>,
511 ) -> Result<(), ContextCreationError> {
512 let progress = ProgressRecord {
513 frontier: upper,
514 version: self.sink_version,
515 };
516 let payload = serde_json::to_vec(&progress).expect("infallible");
517 let record = BaseRecord::to(&self.progress_topic)
518 .payload(&payload)
519 .key(&self.progress_key);
520 self.producer.send(record).map_err(|(e, _)| e)?;
521
522 fail::fail_point!("kafka_sink_commit_transaction");
523
524 let timeout = self.transaction_timeout;
525 match self
526 .spawn_blocking(move |p| p.commit_transaction(timeout))
527 .await
528 {
529 Ok(()) => {
530 self.statistics
531 .inc_messages_committed_by(self.staged_messages);
532 self.statistics.inc_bytes_committed_by(self.staged_bytes);
533 self.staged_messages = 0;
534 self.staged_bytes = 0;
535 Ok(())
536 }
537 Err(ContextCreationError::KafkaError(KafkaError::Transaction(err))) => {
538 if err.txn_requires_abort() {
544 let timeout = self.socket_timeout;
545 self.spawn_blocking(move |p| p.abort_transaction(timeout))
546 .await?;
547 }
548 Err(ContextCreationError::KafkaError(KafkaError::Transaction(
549 err,
550 )))
551 }
552 Err(err) => Err(err),
553 }
554 }
555}
556
557async fn collect_statistics(
559 mut receiver: watch::Receiver<Statistics>,
560 metrics: Arc<KafkaSinkMetrics>,
561) {
562 let mut outbuf_cnt: i64 = 0;
563 let mut outbuf_msg_cnt: i64 = 0;
564 let mut waitresp_cnt: i64 = 0;
565 let mut waitresp_msg_cnt: i64 = 0;
566 let mut txerrs: u64 = 0;
567 let mut txretries: u64 = 0;
568 let mut req_timeouts: u64 = 0;
569 let mut connects: i64 = 0;
570 let mut disconnects: i64 = 0;
571 while receiver.changed().await.is_ok() {
572 let stats = receiver.borrow();
573 for broker in stats.brokers.values() {
574 outbuf_cnt += broker.outbuf_cnt;
575 outbuf_msg_cnt += broker.outbuf_msg_cnt;
576 waitresp_cnt += broker.waitresp_cnt;
577 waitresp_msg_cnt += broker.waitresp_msg_cnt;
578 txerrs += broker.txerrs;
579 txretries += broker.txretries;
580 req_timeouts += broker.req_timeouts;
581 connects += broker.connects.unwrap_or(0);
582 disconnects += broker.disconnects.unwrap_or(0);
583 }
584 metrics.rdkafka_msg_cnt.set(stats.msg_cnt);
585 metrics.rdkafka_msg_size.set(stats.msg_size);
586 metrics.rdkafka_txmsgs.set(stats.txmsgs);
587 metrics.rdkafka_txmsg_bytes.set(stats.txmsg_bytes);
588 metrics.rdkafka_tx.set(stats.tx);
589 metrics.rdkafka_tx_bytes.set(stats.tx_bytes);
590 metrics.rdkafka_outbuf_cnt.set(outbuf_cnt);
591 metrics.rdkafka_outbuf_msg_cnt.set(outbuf_msg_cnt);
592 metrics.rdkafka_waitresp_cnt.set(waitresp_cnt);
593 metrics.rdkafka_waitresp_msg_cnt.set(waitresp_msg_cnt);
594 metrics.rdkafka_txerrs.set(txerrs);
595 metrics.rdkafka_txretries.set(txretries);
596 metrics.rdkafka_req_timeouts.set(req_timeouts);
597 metrics.rdkafka_connects.set(connects);
598 metrics.rdkafka_disconnects.set(disconnects);
599 }
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
604struct KafkaMessage {
605 hash: u64,
607 key: Option<Vec<u8>>,
609 value: Option<Vec<u8>>,
611 headers: Vec<KafkaHeader>,
613}
614
615#[derive(Debug, Clone, Serialize, Deserialize)]
617struct KafkaHeader {
618 key: String,
620 value: Option<Vec<u8>>,
622}
623
624fn sink_collection<G: Scope<Timestamp = Timestamp>>(
630 name: String,
631 input: &VecCollection<G, KafkaMessage, Diff>,
632 sink_id: GlobalId,
633 connection: KafkaSinkConnection,
634 storage_configuration: StorageConfiguration,
635 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
636 metrics: KafkaSinkMetrics,
637 statistics: SinkStatistics,
638 write_handle: impl Future<
639 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
640 > + 'static,
641 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
642) -> (Stream<G, HealthStatusMessage>, PressOnDropButton) {
643 let scope = input.scope();
644 let mut builder = AsyncOperatorBuilder::new(name.clone(), input.inner.scope());
645
646 let hashed_id = sink_id.hashed();
648 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
649 let buffer_min_capacity =
650 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS.handle(storage_configuration.config_set());
651
652 let mut input = builder.new_disconnected_input(&input.inner, Exchange::new(move |_| hashed_id));
653
654 let as_of = sink.as_of.clone();
655 let sink_version = sink.version;
656 let (button, errors) = builder.build_fallible(move |_caps| {
657 Box::pin(async move {
658 if !is_active_worker {
659 write_frontier.borrow_mut().clear();
660 return Ok(());
661 }
662
663 fail::fail_point!("kafka_sink_creation_error", |_| Err(
664 ContextCreationError::Other(anyhow::anyhow!("synthetic error"))
665 ));
666
667 let mut write_handle = write_handle.await?;
668
669 let metrics = Arc::new(metrics);
670
671 let (mut producer, resume_upper) = TransactionalProducer::new(
672 sink_id,
673 &connection,
674 &storage_configuration,
675 Arc::clone(&metrics),
676 statistics,
677 sink_version,
678 )
679 .await?;
680
681 let overcompacted =
683 *resume_upper != [Timestamp::minimum()] &&
685 !PartialOrder::less_equal(&as_of, &resume_upper);
687 if overcompacted {
688 let err = format!(
689 "{name}: input compacted past resume upper: as_of {}, resume_upper: {}",
690 as_of.pretty(),
691 resume_upper.pretty()
692 );
693 error!("{err}");
697 return Err(anyhow!("{err}").into());
698 }
699
700 info!(
701 "{name}: as_of: {}, resume upper: {}",
702 as_of.pretty(),
703 resume_upper.pretty()
704 );
705
706 let Some(mut upper) = resume_upper.clone().into_option() else {
710 write_frontier.borrow_mut().clear();
711 return Ok(());
712 };
713
714 let mut deferred_updates = vec![];
715 let mut extra_updates = vec![];
716 let mut transaction_begun = false;
720 while let Some(event) = input.next().await {
721 match event {
722 Event::Data(_cap, batch) => {
723 for (message, time, diff) in batch {
724 match upper.cmp(&time) {
735 Ordering::Less => deferred_updates.push((message, time, diff)),
736 Ordering::Equal => {
737 if !transaction_begun {
738 producer.begin_transaction().await?;
739 transaction_begun = true;
740 }
741 producer.send(&message, time, diff)?;
742 }
743 Ordering::Greater => continue,
744 }
745 }
746 }
747 Event::Progress(progress) => {
748 if !PartialOrder::less_equal(&resume_upper, &progress) {
750 continue;
751 }
752 if !as_of.iter().all(|t| !progress.less_equal(t)) {
772 continue;
773 }
774 if !transaction_begun {
775 producer.begin_transaction().await?;
776 }
777
778 deferred_updates.shrink_to(buffer_min_capacity.get());
783 extra_updates.extend(
784 deferred_updates
785 .extract_if(.., |(_, time, _)| !progress.less_equal(time)),
786 );
787 extra_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
788
789 extra_updates.shrink_to(buffer_min_capacity.get());
791 for (message, time, diff) in extra_updates.drain(..) {
792 producer.send(&message, time, diff)?;
793 }
794
795 debug!("{name}: committing transaction for {}", progress.pretty());
796 producer.commit_transaction(progress.clone()).await?;
797 transaction_begun = false;
798 let mut expect_upper = write_handle.shared_upper();
799 loop {
800 if PartialOrder::less_equal(&progress, &expect_upper) {
801 break;
803 }
804 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
809 match write_handle
810 .compare_and_append(EMPTY, expect_upper, progress.clone())
811 .await
812 .expect("valid usage")
813 {
814 Ok(()) => break,
815 Err(mismatch) => {
816 expect_upper = mismatch.current;
817 }
818 }
819 }
820 write_frontier.borrow_mut().clone_from(&progress);
821 match progress.into_option() {
822 Some(new_upper) => upper = new_upper,
823 None => break,
824 }
825 }
826 }
827 }
828 Ok(())
829 })
830 });
831
832 let statuses = errors.map(|error: Rc<ContextCreationError>| {
833 let hint = match *error {
834 ContextCreationError::KafkaError(KafkaError::Transaction(ref e)) => {
835 if e.is_retriable() && e.code() == RDKafkaErrorCode::OperationTimedOut {
836 let hint = "If you're running a single Kafka broker, ensure that the configs \
837 transaction.state.log.replication.factor, transaction.state.log.min.isr, \
838 and offsets.topic.replication.factor are set to 1 on the broker";
839 Some(hint.to_owned())
840 } else {
841 None
842 }
843 }
844 _ => None,
845 };
846
847 HealthStatusMessage {
848 id: None,
849 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint),
850 namespace: if matches!(*error, ContextCreationError::Ssh(_)) {
851 StatusNamespace::Ssh
852 } else {
853 StatusNamespace::Kafka
854 },
855 }
856 });
857
858 (statuses, button.press_on_drop())
859}
860
861async fn determine_sink_progress(
868 sink_id: GlobalId,
869 connection: &KafkaSinkConnection,
870 storage_configuration: &StorageConfiguration,
871 metrics: Arc<KafkaSinkMetrics>,
872) -> Result<Option<ProgressRecord>, ContextCreationError> {
873 let TimeoutConfig {
880 fetch_metadata_timeout,
881 progress_record_fetch_timeout,
882 ..
883 } = storage_configuration.parameters.kafka_timeout_config;
884
885 let client_id = connection.client_id(
886 storage_configuration.config_set(),
887 &storage_configuration.connection_context,
888 sink_id,
889 );
890 let group_id = connection.progress_group_id(&storage_configuration.connection_context, sink_id);
891 let progress_topic = connection
892 .progress_topic(&storage_configuration.connection_context)
893 .into_owned();
894 let progress_topic_options = &connection.connection.progress_topic_options;
895 let progress_key = ProgressKey::new(sink_id);
896
897 let common_options = btreemap! {
898 "group.id" => group_id,
901 "client.id" => client_id,
903 "enable.auto.commit" => "false".into(),
904 "auto.offset.reset" => "earliest".into(),
905 "enable.partition.eof" => "true".into(),
908 };
909
910 let progress_client_read_committed: BaseConsumer<_> = {
913 let mut opts = common_options.clone();
914 opts.insert("isolation.level", "read_committed".into());
915 let ctx = MzClientContext::default();
916 connection
917 .connection
918 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
919 .await?
920 };
921
922 let progress_client_read_uncommitted: BaseConsumer<_> = {
923 let mut opts = common_options;
924 opts.insert("isolation.level", "read_uncommitted".into());
925 let ctx = MzClientContext::default();
926 connection
927 .connection
928 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
929 .await?
930 };
931
932 let ctx = Arc::clone(progress_client_read_committed.client().context());
933
934 let ensure_topic_config =
936 match &*SINK_ENSURE_TOPIC_CONFIG.get(storage_configuration.config_set()) {
937 "skip" => EnsureTopicConfig::Skip,
938 "check" => EnsureTopicConfig::Check,
939 "alter" => EnsureTopicConfig::Alter,
940 _ => {
941 tracing::warn!(
942 topic = progress_topic,
943 "unexpected value for ensure-topic-config; skipping checks"
944 );
945 EnsureTopicConfig::Skip
946 }
947 };
948 mz_storage_client::sink::ensure_kafka_topic(
949 connection,
950 storage_configuration,
951 &progress_topic,
952 progress_topic_options,
953 ensure_topic_config,
954 )
955 .await
956 .add_context("error registering kafka progress topic for sink")?;
957
958 let parent_token = Arc::new(());
965 let child_token = Arc::downgrade(&parent_token);
966 let task_name = format!("get_latest_ts:{sink_id}");
967 let sink_progress_search = SINK_PROGRESS_SEARCH.get(storage_configuration.config_set());
968 let result = task::spawn_blocking(|| task_name, move || {
969 let progress_topic = progress_topic.as_ref();
970 let partitions = match mz_kafka_util::client::get_partitions(
974 progress_client_read_committed.client(),
975 progress_topic,
976 fetch_metadata_timeout,
977 ) {
978 Ok(partitions) => partitions,
979 Err(GetPartitionsError::TopicDoesNotExist) => {
980 return Ok(None);
983 }
984 e => e.with_context(|| {
985 format!(
986 "Unable to fetch metadata about progress topic {}",
987 progress_topic
988 )
989 })?,
990 };
991 if partitions.len() != 1 {
992 bail!(
993 "Progress topic {} should contain a single partition, but instead contains {} partitions",
994 progress_topic, partitions.len(),
995 );
996 }
997 let partition = partitions.into_element();
998
999 metrics.consumed_progress_records.set(0);
1008
1009 let (lo, hi) = progress_client_read_uncommitted
1035 .fetch_watermarks(progress_topic, partition, fetch_metadata_timeout)
1036 .map_err(|e| {
1037 anyhow!(
1038 "Failed to fetch metadata while reading from progress topic: {}",
1039 e
1040 )
1041 })?;
1042
1043 let mut start_indices = vec![lo];
1049 if sink_progress_search {
1050 let mut lookback = hi.saturating_sub(lo) / 10;
1051 while lookback >= 20_000 {
1052 start_indices.push(hi - lookback);
1053 lookback /= 10;
1054 }
1055 }
1056 for lo in start_indices.into_iter().rev() {
1057 if let Some(found) = progress_search(
1058 &progress_client_read_committed,
1059 progress_record_fetch_timeout,
1060 progress_topic,
1061 partition,
1062 lo,
1063 hi,
1064 progress_key.clone(),
1065 Weak::clone(&child_token),
1066 Arc::clone(&metrics)
1067 )? {
1068 return Ok(Some(found));
1069 }
1070 }
1071 Ok(None)
1072 }).await.unwrap().check_ssh_status(&ctx);
1073 drop(parent_token);
1075 result
1076}
1077
1078fn progress_search<C: ConsumerContext + 'static>(
1079 progress_client_read_committed: &BaseConsumer<C>,
1080 progress_record_fetch_timeout: Duration,
1081 progress_topic: &str,
1082 partition: i32,
1083 lo: i64,
1084 hi: i64,
1085 progress_key: ProgressKey,
1086 child_token: Weak<()>,
1087 metrics: Arc<KafkaSinkMetrics>,
1088) -> anyhow::Result<Option<ProgressRecord>> {
1089 let mut tps = TopicPartitionList::new();
1091 tps.add_partition(progress_topic, partition);
1092 tps.set_partition_offset(progress_topic, partition, Offset::Offset(lo))?;
1093 progress_client_read_committed
1094 .assign(&tps)
1095 .with_context(|| {
1096 format!(
1097 "Error seeking in progress topic {}:{}",
1098 progress_topic, partition
1099 )
1100 })?;
1101
1102 let get_position = || {
1104 if child_token.strong_count() == 0 {
1105 bail!("operation cancelled");
1106 }
1107 let position = progress_client_read_committed
1108 .position()?
1109 .find_partition(progress_topic, partition)
1110 .ok_or_else(|| {
1111 anyhow!(
1112 "No position info found for progress topic {}",
1113 progress_topic
1114 )
1115 })?
1116 .offset();
1117 let position = match position {
1118 Offset::Offset(position) => position,
1119 Offset::Invalid => lo,
1129 _ => bail!(
1130 "Consumer::position returned offset of wrong type: {:?}",
1131 position
1132 ),
1133 };
1134 let outstanding = u64::try_from(std::cmp::max(0, hi - position)).unwrap();
1136 metrics.outstanding_progress_records.set(outstanding);
1137 Ok(position)
1138 };
1139
1140 info!("fetching latest progress record for {progress_key}, lo/hi: {lo}/{hi}");
1141
1142 let mut last_progress: Option<ProgressRecord> = None;
1166 loop {
1167 let current_position = get_position()?;
1168
1169 if current_position >= hi {
1170 break;
1172 }
1173
1174 let message = match progress_client_read_committed.poll(progress_record_fetch_timeout) {
1175 Some(Ok(message)) => message,
1176 Some(Err(KafkaError::PartitionEOF(_))) => {
1177 continue;
1182 }
1183 Some(Err(e)) => bail!("failed to fetch progress message {e}"),
1184 None => {
1185 bail!(
1186 "timed out while waiting to reach high water mark of non-empty \
1187 topic {progress_topic}:{partition}, lo/hi: {lo}/{hi}, current position: {current_position}"
1188 );
1189 }
1190 };
1191
1192 if message.key() != Some(progress_key.to_bytes()) {
1193 continue;
1195 }
1196
1197 metrics.consumed_progress_records.inc();
1198
1199 let Some(payload) = message.payload() else {
1200 continue;
1201 };
1202 let progress = parse_progress_record(payload)?;
1203
1204 match last_progress {
1205 Some(last_progress)
1206 if !PartialOrder::less_equal(&last_progress.frontier, &progress.frontier) =>
1207 {
1208 bail!(
1209 "upper regressed in topic {progress_topic}:{partition} from {:?} to {:?}",
1210 &last_progress.frontier,
1211 &progress.frontier,
1212 );
1213 }
1214 _ => last_progress = Some(progress),
1215 }
1216 }
1217
1218 Ok(last_progress)
1222}
1223
1224#[derive(Debug, PartialEq, Serialize, Deserialize)]
1231pub struct LegacyProgressRecord {
1232 #[serde(default, deserialize_with = "deserialize_some")]
1235 pub timestamp: Option<Option<Timestamp>>,
1236}
1237
1238fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
1240where
1241 T: Deserialize<'de>,
1242 D: Deserializer<'de>,
1243{
1244 Deserialize::deserialize(deserializer).map(Some)
1245}
1246
1247#[derive(Debug, PartialEq, Serialize, Deserialize)]
1250pub struct ProgressRecord {
1251 #[serde(
1252 deserialize_with = "deserialize_frontier",
1253 serialize_with = "serialize_frontier"
1254 )]
1255 pub frontier: Antichain<Timestamp>,
1256 #[serde(default)]
1257 pub version: u64,
1258}
1259fn serialize_frontier<S>(frontier: &Antichain<Timestamp>, serializer: S) -> Result<S::Ok, S::Error>
1260where
1261 S: Serializer,
1262{
1263 Serialize::serialize(frontier.elements(), serializer)
1264}
1265
1266fn deserialize_frontier<'de, D>(deserializer: D) -> Result<Antichain<Timestamp>, D::Error>
1267where
1268 D: Deserializer<'de>,
1269{
1270 let times: Vec<Timestamp> = Deserialize::deserialize(deserializer)?;
1271 Ok(Antichain::from(times))
1272}
1273
1274fn parse_progress_record(payload: &[u8]) -> Result<ProgressRecord, anyhow::Error> {
1275 Ok(match serde_json::from_slice::<ProgressRecord>(payload) {
1276 Ok(progress) => progress,
1277 Err(_) => match serde_json::from_slice::<LegacyProgressRecord>(payload) {
1279 Ok(LegacyProgressRecord {
1280 timestamp: Some(Some(time)),
1281 }) => ProgressRecord {
1282 frontier: Antichain::from_elem(time.step_forward()),
1283 version: 0,
1284 },
1285 Ok(LegacyProgressRecord {
1286 timestamp: Some(None),
1287 }) => ProgressRecord {
1288 frontier: Antichain::new(),
1289 version: 0,
1290 },
1291 _ => match std::str::from_utf8(payload) {
1292 Ok(payload) => bail!("invalid progress record: {payload}"),
1293 Err(_) => bail!("invalid progress record bytes: {payload:?}"),
1294 },
1295 },
1296 })
1297}
1298
1299async fn fetch_partition_count(
1301 producer: &ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1302 sink_id: GlobalId,
1303 topic_name: &str,
1304) -> Result<u64, anyhow::Error> {
1305 let meta = task::spawn_blocking(|| format!("kafka_sink_fetch_partition_count:{sink_id}"), {
1306 let producer = producer.clone();
1307 move || {
1308 producer
1309 .client()
1310 .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
1311 }
1312 })
1313 .await
1314 .expect("spawning blocking task cannot fail")
1315 .check_ssh_status(producer.context())?;
1316
1317 match meta.topics().iter().find(|t| t.name() == topic_name) {
1318 Some(topic) => {
1319 let partition_count = u64::cast_from(topic.partitions().len());
1320 if partition_count == 0 {
1321 bail!("topic {topic_name} has an impossible partition count of zero");
1322 }
1323 Ok(partition_count)
1324 }
1325 None => bail!("topic {topic_name} does not exist"),
1326 }
1327}
1328
1329async fn fetch_partition_count_loop<F>(
1335 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1336 sink_id: GlobalId,
1337 topic_name: String,
1338 interval: Duration,
1339 update_partition_count: Arc<F>,
1340) where
1341 F: Fn(u64),
1342{
1343 let mut interval = time::interval(interval);
1344 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
1345 loop {
1346 interval.tick().await;
1347 match fetch_partition_count(&producer, sink_id, &topic_name).await {
1348 Ok(pc) => update_partition_count(pc),
1349 Err(e) => {
1350 warn!(%sink_id, "failed updating partition count: {e}");
1351 continue;
1352 }
1353 };
1354 }
1355}
1356
1357fn encode_collection<G: Scope>(
1361 name: String,
1362 input: &VecCollection<G, (Option<Row>, DiffPair<Row>), Diff>,
1363 envelope: SinkEnvelope,
1364 connection: KafkaSinkConnection,
1365 storage_configuration: StorageConfiguration,
1366) -> (
1367 VecCollection<G, KafkaMessage, Diff>,
1368 Stream<G, HealthStatusMessage>,
1369 PressOnDropButton,
1370) {
1371 let mut builder = AsyncOperatorBuilder::new(name, input.inner.scope());
1372
1373 let (output, stream) = builder.new_output::<CapacityContainerBuilder<_>>();
1374 let mut input = builder.new_input_for(&input.inner, Pipeline, &output);
1375
1376 let (button, errors) = builder.build_fallible(move |caps| {
1377 Box::pin(async move {
1378 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1379 let key_desc = connection
1380 .key_desc_and_indices
1381 .as_ref()
1382 .map(|(desc, _indices)| desc.clone());
1383 let value_desc = connection.value_desc;
1384
1385 let key_encoder: Option<Box<dyn Encode>> =
1386 match (key_desc, connection.format.key_format) {
1387 (Some(desc), Some(KafkaSinkFormatType::Bytes)) => {
1388 Some(Box::new(BinaryEncoder::new(desc, false)))
1389 }
1390 (Some(desc), Some(KafkaSinkFormatType::Text)) => {
1391 Some(Box::new(TextEncoder::new(desc, false)))
1392 }
1393 (Some(desc), Some(KafkaSinkFormatType::Json)) => {
1394 Some(Box::new(JsonEncoder::new(desc, false)))
1395 }
1396 (Some(desc), Some(KafkaSinkFormatType::Avro {
1397 schema,
1398 compatibility_level,
1399 csr_connection,
1400 })) => {
1401 let ccsr = csr_connection
1406 .connect(&storage_configuration, InTask::Yes)
1407 .await?;
1408
1409 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1410 ccsr,
1411 format!("{}-key", connection.topic),
1412 schema.clone(),
1413 mz_ccsr::SchemaType::Avro,
1414 compatibility_level,
1415 )
1416 .await
1417 .context("error publishing kafka schemas for sink")?;
1418
1419 Some(Box::new(AvroEncoder::new(desc, false, &schema, schema_id)))
1420 }
1421 (None, None) => None,
1422 (desc, format) => {
1423 return Err(anyhow!(
1424 "key_desc and key_format must be both set or both unset, but key_desc: {:?}, key_format: {:?}",
1425 desc,
1426 format
1427 ))
1428 }
1429 };
1430
1431 let debezium = matches!(envelope, SinkEnvelope::Debezium);
1433
1434 let value_encoder: Box<dyn Encode> = match connection.format.value_format {
1435 KafkaSinkFormatType::Bytes => Box::new(BinaryEncoder::new(value_desc, debezium)),
1436 KafkaSinkFormatType::Text => Box::new(TextEncoder::new(value_desc, debezium)),
1437 KafkaSinkFormatType::Json => Box::new(JsonEncoder::new(value_desc, debezium)),
1438 KafkaSinkFormatType::Avro {
1439 schema,
1440 compatibility_level,
1441 csr_connection,
1442 } => {
1443 let ccsr = csr_connection
1448 .connect(&storage_configuration, InTask::Yes)
1449 .await?;
1450
1451 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1452 ccsr,
1453 format!("{}-value", connection.topic),
1454 schema.clone(),
1455 mz_ccsr::SchemaType::Avro,
1456 compatibility_level,
1457 )
1458 .await
1459 .context("error publishing kafka schemas for sink")?;
1460
1461 Box::new(AvroEncoder::new(value_desc, debezium, &schema, schema_id))
1462 }
1463 };
1464
1465 *capset = CapabilitySet::new();
1471
1472 let mut row_buf = Row::default();
1473 let mut datums = DatumVec::new();
1474
1475 while let Some(event) = input.next().await {
1476 if let Event::Data(cap, rows) = event {
1477 for ((key, value), time, diff) in rows {
1478 let mut hash = None;
1479 let mut headers = vec![];
1480 if connection.headers_index.is_some() || connection.partition_by.is_some() {
1481 let row = value
1494 .after
1495 .as_ref()
1496 .or(value.before.as_ref())
1497 .expect("one of before or after must be set");
1498 let row = datums.borrow_with(row);
1499
1500 if let Some(i) = connection.headers_index {
1501 headers = encode_headers(row[i]);
1502 }
1503
1504 if let Some(partition_by) = &connection.partition_by {
1505 hash = Some(evaluate_partition_by(partition_by, &row));
1506 }
1507 }
1508 let (key, hash) = match key {
1509 Some(key) => {
1510 let key_encoder = key_encoder.as_ref().expect("key present");
1511 let key = key_encoder.encode_unchecked(key);
1512 let hash = hash.unwrap_or_else(|| key_encoder.hash(&key));
1513 (Some(key), hash)
1514 }
1515 None => (None, hash.unwrap_or(0))
1516 };
1517 let value = match envelope {
1518 SinkEnvelope::Upsert => value.after,
1519 SinkEnvelope::Debezium => {
1520 dbz_format(&mut row_buf.packer(), value);
1521 Some(row_buf.clone())
1522 }
1523 };
1524 let value = value.map(|value| value_encoder.encode_unchecked(value));
1525 let message = KafkaMessage {
1526 hash,
1527 key,
1528 value,
1529 headers,
1530 };
1531 output.give(&cap, (message, time, diff));
1532 }
1533 }
1534 }
1535 Ok::<(), anyhow::Error>(())
1536 })
1537 });
1538
1539 let statuses = errors.map(|error| HealthStatusMessage {
1540 id: None,
1541 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1542 namespace: StatusNamespace::Kafka,
1543 });
1544
1545 (stream.as_collection(), statuses, button.press_on_drop())
1546}
1547
1548fn encode_headers(datum: Datum) -> Vec<KafkaHeader> {
1549 let mut out = vec![];
1550 if datum.is_null() {
1551 return out;
1552 }
1553 for (key, value) in datum.unwrap_map().iter() {
1554 out.push(KafkaHeader {
1555 key: key.into(),
1556 value: match value {
1557 Datum::Null => None,
1558 Datum::String(s) => Some(s.as_bytes().to_vec()),
1559 Datum::Bytes(b) => Some(b.to_vec()),
1560 _ => panic!("encode_headers called with unexpected header value {value:?}"),
1561 },
1562 })
1563 }
1564 out
1565}
1566
1567fn evaluate_partition_by(partition_by: &MirScalarExpr, row: &[Datum]) -> u64 {
1575 let temp_storage = RowArena::new();
1583 match partition_by.eval(row, &temp_storage) {
1584 Ok(Datum::UInt64(u)) => u,
1585 Ok(datum) => {
1586 soft_assert_or_log!(datum.is_null(), "unexpected partition_by result: {datum:?}");
1589 0
1591 }
1592 Err(_) => 0,
1593 }
1594}
1595
1596#[cfg(test)]
1597mod test {
1598 use mz_ore::assert_err;
1599
1600 use super::*;
1601
1602 #[mz_ore::test]
1603 fn progress_record_migration() {
1604 assert_err!(parse_progress_record(b"{}"));
1605
1606 assert_eq!(
1607 parse_progress_record(b"{\"timestamp\":1}").unwrap(),
1608 ProgressRecord {
1609 frontier: Antichain::from_elem(2.into()),
1610 version: 0,
1611 }
1612 );
1613
1614 assert_eq!(
1615 parse_progress_record(b"{\"timestamp\":null}").unwrap(),
1616 ProgressRecord {
1617 frontier: Antichain::new(),
1618 version: 0,
1619 }
1620 );
1621
1622 assert_eq!(
1623 parse_progress_record(b"{\"frontier\":[1]}").unwrap(),
1624 ProgressRecord {
1625 frontier: Antichain::from_elem(1.into()),
1626 version: 0,
1627 }
1628 );
1629
1630 assert_eq!(
1631 parse_progress_record(b"{\"frontier\":[]}").unwrap(),
1632 ProgressRecord {
1633 frontier: Antichain::new(),
1634 version: 0,
1635 }
1636 );
1637
1638 assert_eq!(
1639 parse_progress_record(b"{\"frontier\":[], \"version\": 42}").unwrap(),
1640 ProgressRecord {
1641 frontier: Antichain::new(),
1642 version: 42,
1643 }
1644 );
1645
1646 assert_err!(parse_progress_record(b"{\"frontier\":null}"));
1647 }
1648}