1use std::cell::RefCell;
75use std::cmp::Ordering;
76use std::collections::BTreeMap;
77use std::future::Future;
78use std::rc::Rc;
79use std::sync::atomic::AtomicU64;
80use std::sync::{Arc, Weak};
81use std::time::Duration;
82
83use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
84use crate::metrics::sink::kafka::KafkaSinkMetrics;
85use crate::render::sinks::{PkViolationWarner, SinkBatchStream, SinkRender};
86use crate::statistics::SinkStatistics;
87use crate::storage_state::StorageState;
88use anyhow::{Context, anyhow, bail};
89use differential_dataflow::{AsCollection, Hashable, VecCollection};
90use futures::StreamExt;
91use maplit::btreemap;
92use mz_expr::MirScalarExpr;
93use mz_interchange::avro::AvroEncoder;
94use mz_interchange::encode::Encode;
95use mz_interchange::envelopes::{dbz_format, for_each_diff_pair};
96use mz_interchange::json::JsonEncoder;
97use mz_interchange::text_binary::{BinaryEncoder, TextEncoder};
98use mz_kafka_util::admin::EnsureTopicConfig;
99use mz_kafka_util::client::{
100 DEFAULT_FETCH_METADATA_TIMEOUT, GetPartitionsError, MzClientContext, TimeoutConfig,
101 TunnelingClientContext,
102};
103use mz_ore::cast::CastFrom;
104use mz_ore::collections::CollectionExt;
105use mz_ore::error::ErrorExt;
106use mz_ore::future::InTask;
107use mz_ore::soft_assert_or_log;
108use mz_ore::task::{self, AbortOnDropHandle};
109use mz_persist_client::Diagnostics;
110use mz_persist_client::write::WriteHandle;
111use mz_persist_types::codec_impls::UnitSchema;
112use mz_repr::{Datum, DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
113use mz_storage_client::sink::progress_key::ProgressKey;
114use mz_storage_types::StorageDiff;
115use mz_storage_types::configuration::StorageConfiguration;
116use mz_storage_types::controller::CollectionMetadata;
117use mz_storage_types::dyncfgs::{
118 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS, KAFKA_SINK_BATCH_NUM_MESSAGES,
119 KAFKA_SINK_BATCH_SIZE, KAFKA_SINK_MESSAGE_MAX_BYTES, SINK_ENSURE_TOPIC_CONFIG,
120 SINK_PROGRESS_SEARCH,
121};
122use mz_storage_types::errors::{ContextCreationError, ContextCreationErrorExt, DataflowError};
123use mz_storage_types::sinks::{
124 KafkaSinkConnection, KafkaSinkFormatType, SinkEnvelope, StorageSinkDesc,
125};
126use mz_storage_types::sources::SourceData;
127use mz_timely_util::antichain::AntichainExt;
128use mz_timely_util::builder_async::{
129 Event, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
130};
131use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext};
132use rdkafka::error::KafkaError;
133use rdkafka::message::{Header, OwnedHeaders, ToBytes};
134use rdkafka::producer::{BaseRecord, Producer, ThreadedProducer};
135use rdkafka::types::RDKafkaErrorCode;
136use rdkafka::{Message, Offset, Statistics, TopicPartitionList};
137use serde::{Deserialize, Deserializer, Serialize, Serializer};
138use timely::PartialOrder;
139use timely::container::CapacityContainerBuilder;
140use timely::dataflow::StreamVec;
141use timely::dataflow::channels::pact::{Exchange, Pipeline};
142use timely::dataflow::operators::vec::{Map, ToStream};
143use timely::dataflow::operators::{CapabilitySet, Concatenate};
144use timely::progress::{Antichain, Timestamp as _};
145use tokio::sync::watch;
146use tokio::time::{self, MissedTickBehavior};
147use tracing::{debug, error, info, warn};
148
149impl<'scope> SinkRender<'scope> for KafkaSinkConnection {
150 fn get_key_indices(&self) -> Option<&[usize]> {
151 self.key_desc_and_indices
152 .as_ref()
153 .map(|(_desc, indices)| indices.as_slice())
154 }
155
156 fn get_relation_key_indices(&self) -> Option<&[usize]> {
157 self.relation_key_indices.as_deref()
158 }
159
160 fn render_sink(
161 &self,
162 storage_state: &mut StorageState,
163 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
164 sink_id: GlobalId,
165 batches: SinkBatchStream<'scope>,
166 key_is_synthetic: bool,
167 _err_collection: VecCollection<'scope, Timestamp, DataflowError, Diff>,
170 ) -> (
171 StreamVec<'scope, Timestamp, HealthStatusMessage>,
172 Vec<PressOnDropButton>,
173 ) {
174 let scope = batches.scope();
175
176 let write_handle = {
177 let persist = Arc::clone(&storage_state.persist_clients);
178 let shard_meta = sink.to_storage_metadata.clone();
179 async move {
180 let client = persist.open(shard_meta.persist_location).await?;
181 let handle = client
182 .open_writer(
183 shard_meta.data_shard,
184 Arc::new(shard_meta.relation_desc),
185 Arc::new(UnitSchema),
186 Diagnostics::from_purpose("sink handle"),
187 )
188 .await?;
189 Ok(handle)
190 }
191 };
192
193 let write_frontier = Rc::new(RefCell::new(Antichain::from_elem(Timestamp::minimum())));
194 storage_state
195 .sink_write_frontiers
196 .insert(sink_id, Rc::clone(&write_frontier));
197
198 let (encoded, encode_status, encode_token) = encode_collection(
199 format!("kafka-{sink_id}-{}-encode", self.format.get_format_name()),
200 batches,
201 sink.envelope,
202 self.clone(),
203 storage_state.storage_configuration.clone(),
204 sink_id,
205 sink.from,
206 key_is_synthetic,
207 );
208
209 let metrics = storage_state.metrics.get_kafka_sink_metrics(sink_id);
210 let statistics = storage_state
211 .aggregated_statistics
212 .get_sink(&sink_id)
213 .expect("statistics initialized")
214 .clone();
215
216 let (sink_status, sink_token) = sink_collection(
217 format!("kafka-{sink_id}-sink"),
218 encoded,
219 sink_id,
220 self.clone(),
221 storage_state.storage_configuration.clone(),
222 sink,
223 metrics,
224 statistics,
225 write_handle,
226 write_frontier,
227 );
228
229 let running_status = Some(HealthStatusMessage {
230 id: None,
231 update: HealthStatusUpdate::Running,
232 namespace: StatusNamespace::Kafka,
233 })
234 .to_stream(scope);
235
236 let status = scope.concatenate([running_status, encode_status, sink_status]);
237
238 (status, vec![encode_token, sink_token])
239 }
240}
241
242struct TransactionalProducer {
243 task_name: String,
245 data_topic: String,
247 progress_topic: String,
249 progress_key: ProgressKey,
251 sink_version: u64,
253 partition_count: Arc<AtomicU64>,
255 _partition_count_task: AbortOnDropHandle<()>,
257 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
259 statistics: SinkStatistics,
261 staged_messages: u64,
264 staged_bytes: u64,
267 socket_timeout: Duration,
269 transaction_timeout: Duration,
271}
272
273impl TransactionalProducer {
274 async fn new(
278 sink_id: GlobalId,
279 connection: &KafkaSinkConnection,
280 storage_configuration: &StorageConfiguration,
281 metrics: Arc<KafkaSinkMetrics>,
282 statistics: SinkStatistics,
283 sink_version: u64,
284 ) -> Result<(Self, Antichain<mz_repr::Timestamp>), ContextCreationError> {
285 let client_id = connection.client_id(
286 storage_configuration.config_set(),
287 &storage_configuration.connection_context,
288 sink_id,
289 );
290 let transactional_id =
291 connection.transactional_id(&storage_configuration.connection_context, sink_id);
292
293 let timeout_config = &storage_configuration.parameters.kafka_timeout_config;
294 let mut options = BTreeMap::new();
295 options.insert("enable.idempotence", "true".into());
299 options.insert(
301 "compression.type",
302 connection.compression_type.to_librdkafka_option().into(),
303 );
304 options.insert("queue.buffering.max.kbytes", "2147483647".into());
307 options.insert("queue.buffering.max.messages", "0".into());
310 options.insert("queue.buffering.max.ms", format!("{}", 10));
312 options.insert(
314 "transaction.timeout.ms",
315 format!("{}", timeout_config.transaction_timeout.as_millis()),
316 );
317 options.insert("transactional.id", transactional_id);
319 options.insert("client.id", client_id);
321 options.insert("statistics.interval.ms", "1000".into());
323 options.insert(
327 "message.max.bytes",
328 format!(
329 "{}",
330 KAFKA_SINK_MESSAGE_MAX_BYTES.get(storage_configuration.config_set())
331 ),
332 );
333 options.insert(
335 "batch.size",
336 format!(
337 "{}",
338 KAFKA_SINK_BATCH_SIZE.get(storage_configuration.config_set())
339 ),
340 );
341 options.insert(
343 "batch.num.messages",
344 format!(
345 "{}",
346 KAFKA_SINK_BATCH_NUM_MESSAGES.get(storage_configuration.config_set())
347 ),
348 );
349
350 let ctx = MzClientContext::default();
351
352 let stats_receiver = ctx.subscribe_statistics();
353 let task_name = format!("kafka_sink_metrics_collector:{sink_id}");
354 task::spawn(
355 || &task_name,
356 collect_statistics(stats_receiver, Arc::clone(&metrics)),
357 );
358
359 let producer: ThreadedProducer<_> = connection
360 .connection
361 .create_with_context(storage_configuration, ctx, &options, InTask::Yes)
362 .await?;
363
364 let partition_count = Arc::new(AtomicU64::new(0));
366 let update_partition_count = {
367 let partition_count = Arc::clone(&partition_count);
368 let metrics = Arc::clone(&metrics);
369 Arc::new(move |pc| {
370 partition_count.store(pc, std::sync::atomic::Ordering::SeqCst);
371 metrics.partition_count.set(pc);
372 })
373 };
374
375 let partition_count_task = task::spawn(
378 || format!("kafka_sink_producer_fetch_metadata_loop:{sink_id}"),
379 fetch_partition_count_loop(
380 producer.clone(),
381 sink_id,
382 connection.topic.clone(),
383 connection.topic_metadata_refresh_interval,
384 Arc::clone(&update_partition_count),
385 ),
386 );
387
388 let task_name = format!("kafka_sink_producer:{sink_id}");
389 let progress_key = ProgressKey::new(sink_id);
390
391 let producer = Self {
392 task_name,
393 data_topic: connection.topic.clone(),
394 partition_count,
395 _partition_count_task: partition_count_task.abort_on_drop(),
396 progress_topic: connection
397 .progress_topic(&storage_configuration.connection_context)
398 .into_owned(),
399 progress_key,
400 sink_version,
401 producer,
402 statistics,
403 staged_messages: 0,
404 staged_bytes: 0,
405 socket_timeout: timeout_config.socket_timeout,
406 transaction_timeout: timeout_config.transaction_timeout,
407 };
408
409 let timeout = timeout_config.socket_timeout;
410 producer
411 .spawn_blocking(move |p| p.init_transactions(timeout))
412 .await?;
413
414 let progress = determine_sink_progress(
417 sink_id,
418 connection,
419 storage_configuration,
420 Arc::clone(&metrics),
421 )
422 .await?;
423
424 let resume_upper = match progress {
425 Some(progress) => {
426 if sink_version < progress.version {
427 return Err(ContextCreationError::Other(anyhow!(
428 "Fenced off by newer version of the sink. ours={} theirs={}",
429 sink_version,
430 progress.version
431 )));
432 }
433 progress.frontier
434 }
435 None => {
436 mz_storage_client::sink::ensure_kafka_topic(
437 connection,
438 storage_configuration,
439 &connection.topic,
440 &connection.topic_options,
441 EnsureTopicConfig::Skip,
442 )
443 .await?;
444 Antichain::from_elem(Timestamp::minimum())
445 }
446 };
447
448 let partition_count =
453 fetch_partition_count(&producer.producer, sink_id, &connection.topic).await?;
454 update_partition_count(partition_count);
455
456 Ok((producer, resume_upper))
457 }
458
459 async fn spawn_blocking<F, R>(&self, f: F) -> Result<R, ContextCreationError>
462 where
463 F: FnOnce(
464 ThreadedProducer<TunnelingClientContext<MzClientContext>>,
465 ) -> Result<R, KafkaError>
466 + Send
467 + 'static,
468 R: Send + 'static,
469 {
470 let producer = self.producer.clone();
471 task::spawn_blocking(|| &self.task_name, move || f(producer))
472 .await
473 .check_ssh_status(self.producer.context())
474 }
475
476 async fn begin_transaction(&self) -> Result<(), ContextCreationError> {
477 self.spawn_blocking(|p| p.begin_transaction()).await
478 }
479
480 fn send(
486 &mut self,
487 message: &KafkaMessage,
488 time: Timestamp,
489 diff: Diff,
490 ) -> Result<(), KafkaError> {
491 assert_eq!(diff, Diff::ONE, "invalid sink update");
492
493 let mut headers = OwnedHeaders::new().insert(Header {
494 key: "materialize-timestamp",
495 value: Some(time.to_string().as_bytes()),
496 });
497 for header in &message.headers {
498 if header.key.starts_with("materialize-") {
504 continue;
505 }
506
507 headers = headers.insert(Header {
508 key: header.key.as_str(),
509 value: header.value.as_ref(),
510 });
511 }
512
513 let pc = self
514 .partition_count
515 .load(std::sync::atomic::Ordering::SeqCst);
516 let partition = Some(i32::try_from(message.hash % pc).unwrap());
517
518 let record = BaseRecord {
519 topic: &self.data_topic,
520 key: message.key.as_ref(),
521 payload: message.value.as_ref(),
522 headers: Some(headers),
523 partition,
524 timestamp: None,
525 delivery_opaque: (),
526 };
527 let key_size = message.key.as_ref().map(|k| k.len()).unwrap_or(0);
528 let value_size = message.value.as_ref().map(|k| k.len()).unwrap_or(0);
529 let headers_size = message
530 .headers
531 .iter()
532 .map(|h| h.key.len() + h.value.as_ref().map(|v| v.len()).unwrap_or(0))
533 .sum::<usize>();
534 let record_size = u64::cast_from(key_size + value_size + headers_size);
535 self.statistics.inc_messages_staged_by(1);
536 self.staged_messages += 1;
537 self.statistics.inc_bytes_staged_by(record_size);
538 self.staged_bytes += record_size;
539 self.producer.send(record).map_err(|(e, _)| e)
540 }
541
542 async fn commit_transaction(
545 &mut self,
546 upper: Antichain<Timestamp>,
547 ) -> Result<(), ContextCreationError> {
548 let progress = ProgressRecord {
549 frontier: upper,
550 version: self.sink_version,
551 };
552 let payload = serde_json::to_vec(&progress).expect("infallible");
553 let record = BaseRecord::to(&self.progress_topic)
554 .payload(&payload)
555 .key(&self.progress_key);
556 self.producer.send(record).map_err(|(e, _)| e)?;
557
558 fail::fail_point!("kafka_sink_commit_transaction");
559
560 let timeout = self.transaction_timeout;
561 match self
562 .spawn_blocking(move |p| p.commit_transaction(timeout))
563 .await
564 {
565 Ok(()) => {
566 self.statistics
567 .inc_messages_committed_by(self.staged_messages);
568 self.statistics.inc_bytes_committed_by(self.staged_bytes);
569 self.staged_messages = 0;
570 self.staged_bytes = 0;
571 Ok(())
572 }
573 Err(ContextCreationError::KafkaError(KafkaError::Transaction(err))) => {
574 if err.txn_requires_abort() {
580 let timeout = self.socket_timeout;
581 self.spawn_blocking(move |p| p.abort_transaction(timeout))
582 .await?;
583 }
584 Err(ContextCreationError::KafkaError(KafkaError::Transaction(
585 err,
586 )))
587 }
588 Err(err) => Err(err),
589 }
590 }
591}
592
593async fn collect_statistics(
595 mut receiver: watch::Receiver<Statistics>,
596 metrics: Arc<KafkaSinkMetrics>,
597) {
598 let mut outbuf_cnt: i64 = 0;
599 let mut outbuf_msg_cnt: i64 = 0;
600 let mut waitresp_cnt: i64 = 0;
601 let mut waitresp_msg_cnt: i64 = 0;
602 let mut txerrs: u64 = 0;
603 let mut txretries: u64 = 0;
604 let mut req_timeouts: u64 = 0;
605 let mut connects: i64 = 0;
606 let mut disconnects: i64 = 0;
607 while receiver.changed().await.is_ok() {
608 let stats = receiver.borrow();
609 for broker in stats.brokers.values() {
610 outbuf_cnt += broker.outbuf_cnt;
611 outbuf_msg_cnt += broker.outbuf_msg_cnt;
612 waitresp_cnt += broker.waitresp_cnt;
613 waitresp_msg_cnt += broker.waitresp_msg_cnt;
614 txerrs += broker.txerrs;
615 txretries += broker.txretries;
616 req_timeouts += broker.req_timeouts;
617 connects += broker.connects.unwrap_or(0);
618 disconnects += broker.disconnects.unwrap_or(0);
619 }
620 metrics.rdkafka_msg_cnt.set(stats.msg_cnt);
621 metrics.rdkafka_msg_size.set(stats.msg_size);
622 metrics.rdkafka_txmsgs.set(stats.txmsgs);
623 metrics.rdkafka_txmsg_bytes.set(stats.txmsg_bytes);
624 metrics.rdkafka_tx.set(stats.tx);
625 metrics.rdkafka_tx_bytes.set(stats.tx_bytes);
626 metrics.rdkafka_outbuf_cnt.set(outbuf_cnt);
627 metrics.rdkafka_outbuf_msg_cnt.set(outbuf_msg_cnt);
628 metrics.rdkafka_waitresp_cnt.set(waitresp_cnt);
629 metrics.rdkafka_waitresp_msg_cnt.set(waitresp_msg_cnt);
630 metrics.rdkafka_txerrs.set(txerrs);
631 metrics.rdkafka_txretries.set(txretries);
632 metrics.rdkafka_req_timeouts.set(req_timeouts);
633 metrics.rdkafka_connects.set(connects);
634 metrics.rdkafka_disconnects.set(disconnects);
635 }
636}
637
638#[derive(Debug, Clone, Serialize, Deserialize)]
640struct KafkaMessage {
641 hash: u64,
643 key: Option<Vec<u8>>,
645 value: Option<Vec<u8>>,
647 headers: Vec<KafkaHeader>,
649}
650
651#[derive(Debug, Clone, Serialize, Deserialize)]
653struct KafkaHeader {
654 key: String,
656 value: Option<Vec<u8>>,
658}
659
660fn sink_collection<'scope>(
666 name: String,
667 input: VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
668 sink_id: GlobalId,
669 connection: KafkaSinkConnection,
670 storage_configuration: StorageConfiguration,
671 sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
672 metrics: KafkaSinkMetrics,
673 statistics: SinkStatistics,
674 write_handle: impl Future<
675 Output = anyhow::Result<WriteHandle<SourceData, (), Timestamp, StorageDiff>>,
676 > + 'static,
677 write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
678) -> (
679 StreamVec<'scope, Timestamp, HealthStatusMessage>,
680 PressOnDropButton,
681) {
682 let scope = input.scope();
683 let mut builder = AsyncOperatorBuilder::new(name.clone(), input.inner.scope());
684
685 let hashed_id = sink_id.hashed();
687 let is_active_worker = usize::cast_from(hashed_id) % scope.peers() == scope.index();
688 let buffer_min_capacity =
689 KAFKA_BUFFERED_EVENT_RESIZE_THRESHOLD_ELEMENTS.handle(storage_configuration.config_set());
690
691 let mut input = builder.new_disconnected_input(input.inner, Exchange::new(move |_| hashed_id));
692
693 let as_of = sink.as_of.clone();
694 let sink_version = sink.version;
695 let (button, errors) = builder.build_fallible(move |_caps| {
696 Box::pin(async move {
697 if !is_active_worker {
698 write_frontier.borrow_mut().clear();
699 return Ok(());
700 }
701
702 fail::fail_point!("kafka_sink_creation_error", |_| Err(
703 ContextCreationError::Other(anyhow::anyhow!("synthetic error"))
704 ));
705
706 let mut write_handle = write_handle.await?;
707
708 let metrics = Arc::new(metrics);
709
710 let (mut producer, resume_upper) = TransactionalProducer::new(
711 sink_id,
712 &connection,
713 &storage_configuration,
714 Arc::clone(&metrics),
715 statistics,
716 sink_version,
717 )
718 .await?;
719
720 let overcompacted =
722 *resume_upper != [Timestamp::minimum()] &&
724 !PartialOrder::less_equal(&as_of, &resume_upper);
726 if overcompacted {
727 let err = format!(
728 "{name}: input compacted past resume upper: as_of {}, resume_upper: {}",
729 as_of.pretty(),
730 resume_upper.pretty()
731 );
732 error!("{err}");
736 return Err(anyhow!("{err}").into());
737 }
738
739 info!(
740 "{name}: as_of: {}, resume upper: {}",
741 as_of.pretty(),
742 resume_upper.pretty()
743 );
744
745 let Some(mut upper) = resume_upper.clone().into_option() else {
749 write_frontier.borrow_mut().clear();
750 return Ok(());
751 };
752
753 let mut deferred_updates = vec![];
754 let mut extra_updates = vec![];
755 let mut transaction_begun = false;
759 while let Some(event) = input.next().await {
760 match event {
761 Event::Data(_cap, batch) => {
762 for (message, time, diff) in batch {
763 match upper.cmp(&time) {
774 Ordering::Less => deferred_updates.push((message, time, diff)),
775 Ordering::Equal => {
776 if !transaction_begun {
777 producer.begin_transaction().await?;
778 transaction_begun = true;
779 }
780 producer.send(&message, time, diff)?;
781 }
782 Ordering::Greater => continue,
783 }
784 }
785 }
786 Event::Progress(progress) => {
787 if !PartialOrder::less_equal(&resume_upper, &progress) {
789 continue;
790 }
791 if !as_of.iter().all(|t| !progress.less_equal(t)) {
811 continue;
812 }
813 if !transaction_begun {
814 producer.begin_transaction().await?;
815 }
816
817 extra_updates.extend(
818 deferred_updates
819 .extract_if(.., |(_, time, _)| !progress.less_equal(time)),
820 );
821 deferred_updates.shrink_to(buffer_min_capacity.get());
825 extra_updates.sort_unstable_by(|a, b| a.1.cmp(&b.1));
826
827 for (message, time, diff) in extra_updates.drain(..) {
828 producer.send(&message, time, diff)?;
829 }
830 extra_updates.shrink_to(buffer_min_capacity.get());
831
832 debug!("{name}: committing transaction for {}", progress.pretty());
833 producer.commit_transaction(progress.clone()).await?;
834 transaction_begun = false;
835 let mut expect_upper = write_handle.shared_upper();
836 loop {
837 if PartialOrder::less_equal(&progress, &expect_upper) {
838 break;
840 }
841 const EMPTY: &[((SourceData, ()), Timestamp, StorageDiff)] = &[];
846 match write_handle
847 .compare_and_append(EMPTY, expect_upper, progress.clone())
848 .await
849 .expect("valid usage")
850 {
851 Ok(()) => break,
852 Err(mismatch) => {
853 expect_upper = mismatch.current;
854 }
855 }
856 }
857 write_frontier.borrow_mut().clone_from(&progress);
858 match progress.into_option() {
859 Some(new_upper) => upper = new_upper,
860 None => break,
861 }
862 }
863 }
864 }
865 Ok(())
866 })
867 });
868
869 let statuses = errors.map(|error: Rc<ContextCreationError>| {
870 let hint = match *error {
871 ContextCreationError::KafkaError(KafkaError::Transaction(ref e)) => {
872 if e.is_retriable() && e.code() == RDKafkaErrorCode::OperationTimedOut {
873 let hint = "If you're running a single Kafka broker, ensure that the configs \
874 transaction.state.log.replication.factor, transaction.state.log.min.isr, \
875 and offsets.topic.replication.factor are set to 1 on the broker";
876 Some(hint.to_owned())
877 } else {
878 None
879 }
880 }
881 _ => None,
882 };
883
884 HealthStatusMessage {
885 id: None,
886 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), hint),
887 namespace: if matches!(*error, ContextCreationError::Ssh(_)) {
888 StatusNamespace::Ssh
889 } else {
890 StatusNamespace::Kafka
891 },
892 }
893 });
894
895 (statuses, button.press_on_drop())
896}
897
898async fn determine_sink_progress(
905 sink_id: GlobalId,
906 connection: &KafkaSinkConnection,
907 storage_configuration: &StorageConfiguration,
908 metrics: Arc<KafkaSinkMetrics>,
909) -> Result<Option<ProgressRecord>, ContextCreationError> {
910 let TimeoutConfig {
917 fetch_metadata_timeout,
918 progress_record_fetch_timeout,
919 ..
920 } = storage_configuration.parameters.kafka_timeout_config;
921
922 let client_id = connection.client_id(
923 storage_configuration.config_set(),
924 &storage_configuration.connection_context,
925 sink_id,
926 );
927 let group_id = connection.progress_group_id(&storage_configuration.connection_context, sink_id);
928 let progress_topic = connection
929 .progress_topic(&storage_configuration.connection_context)
930 .into_owned();
931 let progress_topic_options = &connection.connection.progress_topic_options;
932 let progress_key = ProgressKey::new(sink_id);
933
934 let common_options = btreemap! {
935 "group.id" => group_id,
938 "client.id" => client_id,
940 "enable.auto.commit" => "false".into(),
941 "auto.offset.reset" => "earliest".into(),
942 "enable.partition.eof" => "true".into(),
945 };
946
947 let progress_client_read_committed: BaseConsumer<_> = {
950 let mut opts = common_options.clone();
951 opts.insert("isolation.level", "read_committed".into());
952 let ctx = MzClientContext::default();
953 connection
954 .connection
955 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
956 .await?
957 };
958
959 let progress_client_read_uncommitted: BaseConsumer<_> = {
960 let mut opts = common_options;
961 opts.insert("isolation.level", "read_uncommitted".into());
962 let ctx = MzClientContext::default();
963 connection
964 .connection
965 .create_with_context(storage_configuration, ctx, &opts, InTask::Yes)
966 .await?
967 };
968
969 let ctx = Arc::clone(progress_client_read_committed.client().context());
970
971 let ensure_topic_config =
973 match &*SINK_ENSURE_TOPIC_CONFIG.get(storage_configuration.config_set()) {
974 "skip" => EnsureTopicConfig::Skip,
975 "check" => EnsureTopicConfig::Check,
976 "alter" => EnsureTopicConfig::Alter,
977 _ => {
978 tracing::warn!(
979 topic = progress_topic,
980 "unexpected value for ensure-topic-config; skipping checks"
981 );
982 EnsureTopicConfig::Skip
983 }
984 };
985 mz_storage_client::sink::ensure_kafka_topic(
986 connection,
987 storage_configuration,
988 &progress_topic,
989 progress_topic_options,
990 ensure_topic_config,
991 )
992 .await
993 .add_context("error registering kafka progress topic for sink")?;
994
995 let parent_token = Arc::new(());
1002 let child_token = Arc::downgrade(&parent_token);
1003 let task_name = format!("get_latest_ts:{sink_id}");
1004 let sink_progress_search = SINK_PROGRESS_SEARCH.get(storage_configuration.config_set());
1005 let result = task::spawn_blocking(|| task_name, move || {
1006 let progress_topic = progress_topic.as_ref();
1007 let partitions = match mz_kafka_util::client::get_partitions(
1011 progress_client_read_committed.client(),
1012 progress_topic,
1013 fetch_metadata_timeout,
1014 ) {
1015 Ok(partitions) => partitions,
1016 Err(GetPartitionsError::TopicDoesNotExist) => {
1017 return Ok(None);
1020 }
1021 e => e.with_context(|| {
1022 format!(
1023 "Unable to fetch metadata about progress topic {}",
1024 progress_topic
1025 )
1026 })?,
1027 };
1028 if partitions.len() != 1 {
1029 bail!(
1030 "Progress topic {} should contain a single partition, but instead contains {} partitions",
1031 progress_topic, partitions.len(),
1032 );
1033 }
1034 let partition = partitions.into_element();
1035
1036 metrics.consumed_progress_records.set(0);
1045
1046 let (lo, hi) = progress_client_read_uncommitted
1072 .fetch_watermarks(progress_topic, partition, fetch_metadata_timeout)
1073 .map_err(|e| {
1074 anyhow!(
1075 "Failed to fetch metadata while reading from progress topic: {}",
1076 e
1077 )
1078 })?;
1079
1080 let mut start_indices = vec![lo];
1086 if sink_progress_search {
1087 let mut lookback = hi.saturating_sub(lo) / 10;
1088 while lookback >= 20_000 {
1089 start_indices.push(hi - lookback);
1090 lookback /= 10;
1091 }
1092 }
1093 for lo in start_indices.into_iter().rev() {
1094 if let Some(found) = progress_search(
1095 &progress_client_read_committed,
1096 progress_record_fetch_timeout,
1097 progress_topic,
1098 partition,
1099 lo,
1100 hi,
1101 progress_key.clone(),
1102 Weak::clone(&child_token),
1103 Arc::clone(&metrics)
1104 )? {
1105 return Ok(Some(found));
1106 }
1107 }
1108 Ok(None)
1109 }).await.check_ssh_status(&ctx);
1110 drop(parent_token);
1112 result
1113}
1114
1115fn progress_search<C: ConsumerContext + 'static>(
1116 progress_client_read_committed: &BaseConsumer<C>,
1117 progress_record_fetch_timeout: Duration,
1118 progress_topic: &str,
1119 partition: i32,
1120 lo: i64,
1121 hi: i64,
1122 progress_key: ProgressKey,
1123 child_token: Weak<()>,
1124 metrics: Arc<KafkaSinkMetrics>,
1125) -> anyhow::Result<Option<ProgressRecord>> {
1126 let mut tps = TopicPartitionList::new();
1128 tps.add_partition(progress_topic, partition);
1129 tps.set_partition_offset(progress_topic, partition, Offset::Offset(lo))?;
1130 progress_client_read_committed
1131 .assign(&tps)
1132 .with_context(|| {
1133 format!(
1134 "Error seeking in progress topic {}:{}",
1135 progress_topic, partition
1136 )
1137 })?;
1138
1139 let get_position = || {
1141 if child_token.strong_count() == 0 {
1142 bail!("operation cancelled");
1143 }
1144 let position = progress_client_read_committed
1145 .position()?
1146 .find_partition(progress_topic, partition)
1147 .ok_or_else(|| {
1148 anyhow!(
1149 "No position info found for progress topic {}",
1150 progress_topic
1151 )
1152 })?
1153 .offset();
1154 let position = match position {
1155 Offset::Offset(position) => position,
1156 Offset::Invalid => lo,
1166 _ => bail!(
1167 "Consumer::position returned offset of wrong type: {:?}",
1168 position
1169 ),
1170 };
1171 let outstanding = u64::try_from(std::cmp::max(0, hi - position)).unwrap();
1173 metrics.outstanding_progress_records.set(outstanding);
1174 Ok(position)
1175 };
1176
1177 info!("fetching latest progress record for {progress_key}, lo/hi: {lo}/{hi}");
1178
1179 let mut last_progress: Option<ProgressRecord> = None;
1203 loop {
1204 let current_position = get_position()?;
1205
1206 if current_position >= hi {
1207 break;
1209 }
1210
1211 let message = match progress_client_read_committed.poll(progress_record_fetch_timeout) {
1212 Some(Ok(message)) => message,
1213 Some(Err(KafkaError::PartitionEOF(_))) => {
1214 continue;
1219 }
1220 Some(Err(e)) => bail!("failed to fetch progress message {e}"),
1221 None => {
1222 bail!(
1223 "timed out while waiting to reach high water mark of non-empty \
1224 topic {progress_topic}:{partition}, lo/hi: {lo}/{hi}, current position: {current_position}"
1225 );
1226 }
1227 };
1228
1229 if message.key() != Some(progress_key.to_bytes()) {
1230 continue;
1232 }
1233
1234 metrics.consumed_progress_records.inc();
1235
1236 let Some(payload) = message.payload() else {
1237 continue;
1238 };
1239 let progress = parse_progress_record(payload)?;
1240
1241 match last_progress {
1242 Some(last_progress)
1243 if !PartialOrder::less_equal(&last_progress.frontier, &progress.frontier) =>
1244 {
1245 bail!(
1246 "upper regressed in topic {progress_topic}:{partition} from {:?} to {:?}",
1247 &last_progress.frontier,
1248 &progress.frontier,
1249 );
1250 }
1251 _ => last_progress = Some(progress),
1252 }
1253 }
1254
1255 Ok(last_progress)
1259}
1260
1261#[derive(Debug, PartialEq, Serialize, Deserialize)]
1268pub struct LegacyProgressRecord {
1269 #[serde(default, deserialize_with = "deserialize_some")]
1272 pub timestamp: Option<Option<Timestamp>>,
1273}
1274
1275fn deserialize_some<'de, T, D>(deserializer: D) -> Result<Option<T>, D::Error>
1277where
1278 T: Deserialize<'de>,
1279 D: Deserializer<'de>,
1280{
1281 Deserialize::deserialize(deserializer).map(Some)
1282}
1283
1284#[derive(Debug, PartialEq, Serialize, Deserialize)]
1287pub struct ProgressRecord {
1288 #[serde(
1289 deserialize_with = "deserialize_frontier",
1290 serialize_with = "serialize_frontier"
1291 )]
1292 pub frontier: Antichain<Timestamp>,
1293 #[serde(default)]
1294 pub version: u64,
1295}
1296fn serialize_frontier<S>(frontier: &Antichain<Timestamp>, serializer: S) -> Result<S::Ok, S::Error>
1297where
1298 S: Serializer,
1299{
1300 Serialize::serialize(frontier.elements(), serializer)
1301}
1302
1303fn deserialize_frontier<'de, D>(deserializer: D) -> Result<Antichain<Timestamp>, D::Error>
1304where
1305 D: Deserializer<'de>,
1306{
1307 let times: Vec<Timestamp> = Deserialize::deserialize(deserializer)?;
1308 Ok(Antichain::from(times))
1309}
1310
1311fn parse_progress_record(payload: &[u8]) -> Result<ProgressRecord, anyhow::Error> {
1312 Ok(match serde_json::from_slice::<ProgressRecord>(payload) {
1313 Ok(progress) => progress,
1314 Err(_) => match serde_json::from_slice::<LegacyProgressRecord>(payload) {
1316 Ok(LegacyProgressRecord {
1317 timestamp: Some(Some(time)),
1318 }) => ProgressRecord {
1319 frontier: Antichain::from_elem(time.step_forward()),
1320 version: 0,
1321 },
1322 Ok(LegacyProgressRecord {
1323 timestamp: Some(None),
1324 }) => ProgressRecord {
1325 frontier: Antichain::new(),
1326 version: 0,
1327 },
1328 _ => match std::str::from_utf8(payload) {
1329 Ok(payload) => bail!("invalid progress record: {payload}"),
1330 Err(_) => bail!("invalid progress record bytes: {payload:?}"),
1331 },
1332 },
1333 })
1334}
1335
1336async fn fetch_partition_count(
1338 producer: &ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1339 sink_id: GlobalId,
1340 topic_name: &str,
1341) -> Result<u64, anyhow::Error> {
1342 let meta = task::spawn_blocking(|| format!("kafka_sink_fetch_partition_count:{sink_id}"), {
1343 let producer = producer.clone();
1344 move || {
1345 producer
1346 .client()
1347 .fetch_metadata(None, DEFAULT_FETCH_METADATA_TIMEOUT)
1348 }
1349 })
1350 .await
1351 .check_ssh_status(producer.context())?;
1352
1353 match meta.topics().iter().find(|t| t.name() == topic_name) {
1354 Some(topic) => {
1355 let partition_count = u64::cast_from(topic.partitions().len());
1356 if partition_count == 0 {
1357 bail!("topic {topic_name} has an impossible partition count of zero");
1358 }
1359 Ok(partition_count)
1360 }
1361 None => bail!("topic {topic_name} does not exist"),
1362 }
1363}
1364
1365async fn fetch_partition_count_loop<F>(
1371 producer: ThreadedProducer<TunnelingClientContext<MzClientContext>>,
1372 sink_id: GlobalId,
1373 topic_name: String,
1374 interval: Duration,
1375 update_partition_count: Arc<F>,
1376) where
1377 F: Fn(u64),
1378{
1379 let mut interval = time::interval(interval);
1380 interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
1381 loop {
1382 interval.tick().await;
1383 match fetch_partition_count(&producer, sink_id, &topic_name).await {
1384 Ok(pc) => update_partition_count(pc),
1385 Err(e) => {
1386 warn!(%sink_id, "failed updating partition count: {e}");
1387 continue;
1388 }
1389 };
1390 }
1391}
1392
1393fn encode_collection<'scope>(
1399 name: String,
1400 batches: SinkBatchStream<'scope>,
1401 envelope: SinkEnvelope,
1402 connection: KafkaSinkConnection,
1403 storage_configuration: StorageConfiguration,
1404 sink_id: GlobalId,
1405 from_id: GlobalId,
1406 key_is_synthetic: bool,
1407) -> (
1408 VecCollection<'scope, Timestamp, KafkaMessage, Diff>,
1409 StreamVec<'scope, Timestamp, HealthStatusMessage>,
1410 PressOnDropButton,
1411) {
1412 let mut builder = AsyncOperatorBuilder::new(name, batches.scope());
1413
1414 let (output, stream) = builder.new_output::<CapacityContainerBuilder<Vec<_>>>();
1415 let mut input = builder.new_input_for(batches, Pipeline, &output);
1416
1417 let (button, errors) = builder.build_fallible(move |caps| {
1418 Box::pin(async move {
1419 let [capset]: &mut [_; 1] = caps.try_into().unwrap();
1420 let key_desc = connection
1421 .key_desc_and_indices
1422 .as_ref()
1423 .map(|(desc, _indices)| desc.clone());
1424 let value_desc = connection.value_desc;
1425
1426 let key_encoder: Option<Box<dyn Encode>> =
1427 match (key_desc, connection.format.key_format) {
1428 (Some(desc), Some(KafkaSinkFormatType::Bytes)) => {
1429 Some(Box::new(BinaryEncoder::new(desc, false)))
1430 }
1431 (Some(desc), Some(KafkaSinkFormatType::Text)) => {
1432 Some(Box::new(TextEncoder::new(desc, false)))
1433 }
1434 (Some(desc), Some(KafkaSinkFormatType::Json)) => {
1435 Some(Box::new(JsonEncoder::new(desc, false)))
1436 }
1437 (Some(desc), Some(KafkaSinkFormatType::Avro {
1438 schema,
1439 compatibility_level,
1440 csr_connection,
1441 })) => {
1442 let ccsr = csr_connection
1447 .connect(&storage_configuration, InTask::Yes)
1448 .await?;
1449
1450 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1451 ccsr,
1452 format!("{}-key", connection.topic),
1453 schema.clone(),
1454 mz_ccsr::SchemaType::Avro,
1455 compatibility_level,
1456 )
1457 .await
1458 .context("error publishing kafka schemas for sink")?;
1459
1460 Some(Box::new(AvroEncoder::new(desc, false, &schema, schema_id)))
1461 }
1462 (None, None) => None,
1463 (desc, format) => {
1464 return Err(anyhow!(
1465 "key_desc and key_format must be both set or both unset, but key_desc: {:?}, key_format: {:?}",
1466 desc,
1467 format
1468 ))
1469 }
1470 };
1471
1472 let debezium = matches!(envelope, SinkEnvelope::Debezium);
1474
1475 let value_encoder: Box<dyn Encode> = match connection.format.value_format {
1476 KafkaSinkFormatType::Bytes => Box::new(BinaryEncoder::new(value_desc, debezium)),
1477 KafkaSinkFormatType::Text => Box::new(TextEncoder::new(value_desc, debezium)),
1478 KafkaSinkFormatType::Json => Box::new(JsonEncoder::new(value_desc, debezium)),
1479 KafkaSinkFormatType::Avro {
1480 schema,
1481 compatibility_level,
1482 csr_connection,
1483 } => {
1484 let ccsr = csr_connection
1489 .connect(&storage_configuration, InTask::Yes)
1490 .await?;
1491
1492 let schema_id = mz_storage_client::sink::publish_kafka_schema(
1493 ccsr,
1494 format!("{}-value", connection.topic),
1495 schema.clone(),
1496 mz_ccsr::SchemaType::Avro,
1497 compatibility_level,
1498 )
1499 .await
1500 .context("error publishing kafka schemas for sink")?;
1501
1502 Box::new(AvroEncoder::new(value_desc, debezium, &schema, schema_id))
1503 }
1504 };
1505
1506 *capset = CapabilitySet::new();
1512
1513 let mut row_buf = Row::default();
1514 let mut datums = DatumVec::new();
1515 let mut pk_warner =
1516 (!key_is_synthetic).then(|| PkViolationWarner::new(sink_id, from_id));
1517
1518 while let Some(event) = input.next().await {
1519 if let Event::Data(cap, mut batches) = event {
1520 for batch in batches.drain(..) {
1521 for_each_diff_pair(&batch, |key, time, value| {
1522 if let Some(warner) = pk_warner.as_mut() {
1523 warner.observe(key, time);
1524 }
1525 let key_for_message = if key_encoder.is_some() { key } else { &None };
1529
1530 let mut hash = None;
1531 let mut headers = vec![];
1532 if connection.headers_index.is_some()
1533 || connection.partition_by.is_some()
1534 {
1535 let row = value
1549 .after
1550 .as_ref()
1551 .or(value.before.as_ref())
1552 .expect("one of before or after must be set");
1553 let row = datums.borrow_with(row);
1554
1555 if let Some(i) = connection.headers_index {
1556 headers = encode_headers(row[i]);
1557 }
1558
1559 if let Some(partition_by) = &connection.partition_by {
1560 hash = Some(evaluate_partition_by(partition_by, &row));
1561 }
1562 }
1563 let (encoded_key, hash) = match key_for_message {
1564 Some(key) => {
1565 let key_encoder =
1566 key_encoder.as_ref().expect("key present");
1567 let encoded = key_encoder.encode_unchecked(key.clone());
1568 let hash =
1569 hash.unwrap_or_else(|| key_encoder.hash(&encoded));
1570 (Some(encoded), hash)
1571 }
1572 None => (None, hash.unwrap_or(0)),
1573 };
1574 let value = match envelope {
1575 SinkEnvelope::Upsert => value.after,
1576 SinkEnvelope::Debezium => {
1577 dbz_format(&mut row_buf.packer(), value);
1578 Some(row_buf.clone())
1579 }
1580 SinkEnvelope::Append => {
1581 unreachable!("Append envelope is not valid for Kafka sinks")
1582 }
1583 };
1584 let value = value.map(|value| value_encoder.encode_unchecked(value));
1585 let message = KafkaMessage {
1586 hash,
1587 key: encoded_key,
1588 value,
1589 headers,
1590 };
1591 output.give(&cap, (message, time, Diff::ONE));
1592 });
1593 if let Some(warner) = pk_warner.as_mut() {
1597 warner.flush();
1598 }
1599 }
1600 }
1601 }
1602
1603 Ok::<(), anyhow::Error>(())
1604 })
1605 });
1606
1607 let statuses = errors.map(|error| HealthStatusMessage {
1608 id: None,
1609 update: HealthStatusUpdate::halting(format!("{}", error.display_with_causes()), None),
1610 namespace: StatusNamespace::Kafka,
1611 });
1612
1613 (stream.as_collection(), statuses, button.press_on_drop())
1614}
1615
1616fn encode_headers(datum: Datum) -> Vec<KafkaHeader> {
1617 let mut out = vec![];
1618 if datum.is_null() {
1619 return out;
1620 }
1621 for (key, value) in datum.unwrap_map().iter() {
1622 out.push(KafkaHeader {
1623 key: key.into(),
1624 value: match value {
1625 Datum::Null => None,
1626 Datum::String(s) => Some(s.as_bytes().to_vec()),
1627 Datum::Bytes(b) => Some(b.to_vec()),
1628 _ => panic!("encode_headers called with unexpected header value {value:?}"),
1629 },
1630 })
1631 }
1632 out
1633}
1634
1635fn evaluate_partition_by(partition_by: &MirScalarExpr, row: &[Datum]) -> u64 {
1643 let temp_storage = RowArena::new();
1651 match partition_by.eval(row, &temp_storage) {
1652 Ok(Datum::UInt64(u)) => u,
1653 Ok(datum) => {
1654 soft_assert_or_log!(datum.is_null(), "unexpected partition_by result: {datum:?}");
1657 0
1659 }
1660 Err(_) => 0,
1661 }
1662}
1663
1664#[cfg(test)]
1665mod test {
1666 use mz_ore::assert_err;
1667
1668 use super::*;
1669
1670 #[mz_ore::test]
1671 fn progress_record_migration() {
1672 assert_err!(parse_progress_record(b"{}"));
1673
1674 assert_eq!(
1675 parse_progress_record(b"{\"timestamp\":1}").unwrap(),
1676 ProgressRecord {
1677 frontier: Antichain::from_elem(2.into()),
1678 version: 0,
1679 }
1680 );
1681
1682 assert_eq!(
1683 parse_progress_record(b"{\"timestamp\":null}").unwrap(),
1684 ProgressRecord {
1685 frontier: Antichain::new(),
1686 version: 0,
1687 }
1688 );
1689
1690 assert_eq!(
1691 parse_progress_record(b"{\"frontier\":[1]}").unwrap(),
1692 ProgressRecord {
1693 frontier: Antichain::from_elem(1.into()),
1694 version: 0,
1695 }
1696 );
1697
1698 assert_eq!(
1699 parse_progress_record(b"{\"frontier\":[]}").unwrap(),
1700 ProgressRecord {
1701 frontier: Antichain::new(),
1702 version: 0,
1703 }
1704 );
1705
1706 assert_eq!(
1707 parse_progress_record(b"{\"frontier\":[], \"version\": 42}").unwrap(),
1708 ProgressRecord {
1709 frontier: Antichain::new(),
1710 version: 42,
1711 }
1712 );
1713
1714 assert_err!(parse_progress_record(b"{\"frontier\":null}"));
1715 }
1716}