fn sink_collection<G: Scope<Timestamp = Timestamp>>(
name: String,
input: &Collection<G, KafkaMessage, Diff>,
sink_id: GlobalId,
connection: KafkaSinkConnection,
storage_configuration: StorageConfiguration,
sink: &StorageSinkDesc<CollectionMetadata, Timestamp>,
metrics: KafkaSinkMetrics,
statistics: SinkStatistics,
write_handle: impl Future<Output = Result<WriteHandle<SourceData, (), Timestamp, Diff>>> + 'static,
write_frontier: Rc<RefCell<Antichain<Timestamp>>>,
) -> (Stream<G, HealthStatusMessage>, PressOnDropButton)
Expand description
Sinks a collection of encoded rows to Kafka.
This operator exchanges all updates to a single worker by hashing on the given sink id
.
Updates are sent in ascending timestamp order.