Struct mz_compute::sink::kafka::KafkaSinkState
source · [−]struct KafkaSinkState {Show 14 fields
name: String,
topic: String,
topic_prefix: String,
shutdown_flag: Arc<AtomicBool>,
metrics: Arc<SinkMetrics>,
producer: KafkaTxProducer,
activator: Activator,
transactional: bool,
pending_rows: HashMap<Timestamp, Vec<EncodedRow>>,
ready_rows: VecDeque<(Timestamp, Vec<EncodedRow>)>,
retry_manager: Arc<Mutex<KafkaSinkSendRetryManager>>,
sink_state: KafkaSinkStateEnum,
latest_progress_ts: Timestamp,
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
}
Fields
name: String
topic: String
topic_prefix: String
shutdown_flag: Arc<AtomicBool>
metrics: Arc<SinkMetrics>
producer: KafkaTxProducer
activator: Activator
transactional: bool
pending_rows: HashMap<Timestamp, Vec<EncodedRow>>
ready_rows: VecDeque<(Timestamp, Vec<EncodedRow>)>
retry_manager: Arc<Mutex<KafkaSinkSendRetryManager>>
sink_state: KafkaSinkStateEnum
latest_progress_ts: Timestamp
Timestamp of the latest END
record that was written out to Kafka.
write_frontier: Rc<RefCell<Antichain<Timestamp>>>
Write frontier of this sink.
The write frontier potentially blocks compaction of timestamp bindings
in upstream sources. The latest written END
record is used when
restarting the sink to gate updates with a lower timestamp. We advance
the write frontier in lockstep with writing out END records. This
ensures that we don’t write updates more than once, ensuring
exactly-once guarantees.
Implementations
sourceimpl KafkaSinkState
impl KafkaSinkState
fn new(
connection: KafkaSinkConnection,
sink_name: String,
sink_id: &GlobalId,
worker_id: String,
shutdown_flag: Arc<AtomicBool>,
activator: Activator,
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
metrics: &KafkaBaseMetrics,
connection_context: &ConnectionContext
) -> Self
fn create_producer_config(
connection: &KafkaSinkConnection,
connection_context: &ConnectionContext
) -> ClientConfig
fn create_consistency_client_config(
connection: &KafkaSinkConnection,
connection_context: &ConnectionContext
) -> ClientConfig
async fn retry_on_txn_error<'a, F, Fut, T>(&self, f: F) -> KafkaResult<T> where
F: Fn(KafkaTxProducer) -> Fut,
Fut: Future<Output = KafkaResult<T>>,
async fn send<'a, K, P>(&self, record: BaseRecord<'a, K, P>) -> KafkaResult<()> where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
async fn flush(&self) -> KafkaResult<()>
async fn flush_inner(&self) -> KafkaResult<()>
async fn determine_latest_consistency_record(
&self
) -> Result<Option<Timestamp>, Error>
async fn send_consistency_record(
&self,
transaction_id: &str,
status: &str,
message_count: Option<i64>,
consistency: &KafkaConsistencyRunningState
) -> KafkaResult<()>
sourcefn assert_progress(&self, ts: &Timestamp)
fn assert_progress(&self, ts: &Timestamp)
Asserts that the write frontier has not yet advanced beyond t
.
sourcefn maybe_update_progress(&mut self, latest_update_ts: &Timestamp)
fn maybe_update_progress(&mut self, latest_update_ts: &Timestamp)
Updates the latest progress update timestamp to latest_update_ts
if it
is greater than the currently maintained timestamp.
This does not emit a progress update and should be used when the sink emits a progress update that is not based on updates of the input frontier.
See maybe_emit_progress
.
sourceasync fn maybe_emit_progress<'a>(
&mut self,
input_frontier: AntichainRef<'a, Timestamp>
) -> Result<bool, Error>
async fn maybe_emit_progress<'a>(
&mut self,
input_frontier: AntichainRef<'a, Timestamp>
) -> Result<bool, Error>
Updates the latest progress update timestamp based on the given input frontier and pending rows.
This will emit an END
record to the consistency topic if the frontier
advanced and advance the maintained write frontier, which will in turn
unblock compaction of timestamp bindings in sources.
NOTE: END
records will only be emitted when
KafkaSinkConnection.consistency
points to a consistency topic. The
write frontier will be advanced regardless.
Auto Trait Implementations
impl !RefUnwindSafe for KafkaSinkState
impl !Send for KafkaSinkState
impl !Sync for KafkaSinkState
impl Unpin for KafkaSinkState
impl !UnwindSafe for KafkaSinkState
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<T> FutureExt for T
impl<T> FutureExt for T
sourcefn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
sourcefn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
sourceimpl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
sourcefn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
Wrap the input message T
in a tonic::Request
sourceimpl<T> Pointable for T
impl<T> Pointable for T
sourceimpl<P, R> ProtoType<R> for P where
R: RustType<P>,
impl<P, R> ProtoType<R> for P where
R: RustType<P>,
sourcefn into_rust(self) -> Result<R, TryFromProtoError>
fn into_rust(self) -> Result<R, TryFromProtoError>
See RustType::from_proto
.
sourcefn from_rust(rust: &R) -> P
fn from_rust(rust: &R) -> P
See RustType::into_proto
.
sourceimpl<T> WithSubscriber for T
impl<T> WithSubscriber for T
sourcefn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
sourcefn with_current_subscriber(self) -> WithDispatch<Self>
fn with_current_subscriber(self) -> WithDispatch<Self>
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more