Struct dataflow::sink::kafka::KafkaSinkState [−][src]
struct KafkaSinkState {Show 13 fields
name: String,
topic: String,
topic_prefix: String,
shutdown_flag: Arc<AtomicBool>,
metrics: Arc<SinkMetrics>,
producer: KafkaTxProducer,
activator: Activator,
transactional: bool,
pending_rows: HashMap<Timestamp, Vec<EncodedRow>>,
ready_rows: VecDeque<(Timestamp, Vec<EncodedRow>)>,
sink_state: KafkaSinkStateEnum,
latest_progress_ts: Timestamp,
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
}
Fields
name: String
topic: String
topic_prefix: String
shutdown_flag: Arc<AtomicBool>
metrics: Arc<SinkMetrics>
producer: KafkaTxProducer
activator: Activator
transactional: bool
pending_rows: HashMap<Timestamp, Vec<EncodedRow>>
ready_rows: VecDeque<(Timestamp, Vec<EncodedRow>)>
sink_state: KafkaSinkStateEnum
latest_progress_ts: Timestamp
Timestamp of the latest END
record that was written out to Kafka.
write_frontier: Rc<RefCell<Antichain<Timestamp>>>
Write frontier of this sink.
The write frontier potentially blocks compaction of timestamp bindings
in upstream sources. The latest written END
record is used when
restarting the sink to gate updates with a lower timestamp. We advance
the write frontier in lockstep with writing out END records. This
ensures that we don’t write updates more than once, ensuring
exactly-once guaruantees.
Implementations
fn new(
connector: KafkaSinkConnector,
sink_name: String,
sink_id: &GlobalId,
worker_id: String,
shutdown_flag: Arc<AtomicBool>,
activator: Activator,
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
metrics: &KafkaBaseMetrics
) -> Self
async fn retry_on_txn_error<'a, F, Fut, T>(&self, f: F) -> KafkaResult<T> where
F: Fn(KafkaTxProducer) -> Fut,
Fut: Future<Output = KafkaResult<T>>,
async fn send<'a, K, P>(&self, record: BaseRecord<'a, K, P>) -> KafkaResult<()> where
K: ToBytes + ?Sized,
P: ToBytes + ?Sized,
async fn send_consistency_record(
&self,
transaction_id: &str,
status: &str,
message_count: Option<i64>,
consistency: &KafkaConsistencyRunningState
) -> KafkaResult<()>
Asserts that the write frontier has not yet advanced beyond t
.
Updates the latest progress update timestamp to latest_update_ts
if it
is greater than the currently maintained timestamp.
This does not emit a progress update and should be used when the sink emits a progress update that is not based on updates of the input frontier.
See maybe_emit_progress
.
async fn maybe_emit_progress<'a>(
&mut self,
input_frontier: AntichainRef<'a, Timestamp>
) -> Result<bool, Error>
async fn maybe_emit_progress<'a>(
&mut self,
input_frontier: AntichainRef<'a, Timestamp>
) -> Result<bool, Error>
Updates the latest progress update timestamp based on the given input frontier and pending rows.
This will emit an END
record to the consistency topic if the frontier
advanced and advance the maintained write frontier, which will in turn
unblock compaction of timestamp bindings in sources.
NOTE: END
records will only be emitted when
KafkaSinkConnector.consistency
points to a consistency topic. The
write frontier will be advanced regardless.
Auto Trait Implementations
impl !RefUnwindSafe for KafkaSinkState
impl !Send for KafkaSinkState
impl !Sync for KafkaSinkState
impl Unpin for KafkaSinkState
impl !UnwindSafe for KafkaSinkState
Blanket Implementations
Mutably borrows from an owned value. Read more
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more