pub fn produce_to_kafka<G>(
    stream: Stream<G, ((Option<Vec<u8>>, Option<Vec<u8>>), Timestamp, Diff)>,
    id: GlobalId,
    name: String,
    connection: KafkaSinkConnection,
    as_of: SinkAsOf,
    shared_gate_ts: Rc<Cell<Option<Timestamp>>>,
    write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
    metrics: &KafkaBaseMetrics,
    connection_context: &ConnectionContext
) -> Rc<dyn Any> where
    G: Scope<Timestamp = Timestamp>, 
Expand description

Produces/sends a stream of encoded rows (as Vec<u8>) to Kafka.

This operator exchanges all updates to a single worker by hashing on the given sink id.

Updates are only sent to Kafka once the input frontier has passed their time. Updates are sent in ascending timestamp order. The order of updates at the same timestamp will not be changed. However, it is important to keep in mind that this operator exchanges updates so if the input stream is sharded updates will likely arrive at this operator in some non-deterministic order.

Updates that are not beyond the given SinkAsOf and/or the gate_ts in KafkaSinkConnection will be discarded without producing them.