fn encode_collection<'scope>(
name: String,
batches: StreamVec<'scope, Timestamp, <TraceAgent<OrdValSpine<Option<Row>, Row, Timestamp, Diff>> as TraceReader>::Batch>,
envelope: SinkEnvelope,
connection: KafkaSinkConnection,
storage_configuration: StorageConfiguration,
sink_id: GlobalId,
from_id: GlobalId,
key_is_synthetic: bool,
) -> (VecCollection<'scope, Timestamp, KafkaMessage, Diff>, StreamVec<'scope, Timestamp, HealthStatusMessage>, PressOnDropButton)Expand description
Walks each arrangement batch and emits encoded Kafka messages, one per
DiffPair observed at each (key, timestamp).
When key_is_synthetic, the batch keys are per-row hashes used only for
worker distribution; the emitted KafkaMessage uses no key in that case.