Struct dataflow::source::KafkaSourceReader [−][src]
pub struct KafkaSourceReader {Show 13 fields
topic_name: String,
source_name: String,
id: SourceInstanceId,
consumer: Arc<BaseConsumer<GlueConsumerContext>>,
partition_consumers: VecDeque<PartitionConsumer>,
worker_id: usize,
worker_count: usize,
last_offsets: HashMap<i32, i64>,
start_offsets: HashMap<i32, i64>,
logger: Option<Logger>,
stats_rx: Receiver<Jsonb>,
last_stats: Option<Jsonb>,
partition_info: Arc<Mutex<Option<Vec<i32>>>>,
}
Expand description
Contains all information necessary to ingest data from Kafka
Fields
topic_name: String
Name of the topic on which this source is backed on
source_name: String
Name of the source (will have format kafka-source-id)
id: SourceInstanceId
Source instance ID
consumer: Arc<BaseConsumer<GlueConsumerContext>>
Kafka consumer for this source
partition_consumers: VecDeque<PartitionConsumer>
List of consumers. A consumer should be assigned per partition to guarantee fairness
worker_id: usize
Worker ID
worker_count: usize
Total count of workers
last_offsets: HashMap<i32, i64>
Map from partition -> most recently read offset
start_offsets: HashMap<i32, i64>
Map from partition -> offset to start reading at
logger: Option<Logger>
Timely worker logger for source events
stats_rx: Receiver<Jsonb>
Channel to receive Kafka statistics JSON blobs from the stats callback.
last_stats: Option<Jsonb>
partition_info: Arc<Mutex<Option<Vec<i32>>>>
The last partition we received
Implementations
pub fn new(
source_name: String,
source_id: SourceInstanceId,
worker_id: usize,
worker_count: usize,
consumer_activator: SyncActivator,
kc: KafkaSourceConnector,
restored_offsets: Vec<(PartitionId, Option<MzOffset>)>,
logger: Option<Logger>
) -> KafkaSourceReader
pub fn new(
source_name: String,
source_id: SourceInstanceId,
worker_id: usize,
worker_count: usize,
consumer_activator: SyncActivator,
kc: KafkaSourceConnector,
restored_offsets: Vec<(PartitionId, Option<MzOffset>)>,
logger: Option<Logger>
) -> KafkaSourceReader
Constructor
Ensures that a partition queue for pid
exists.
In Kafka, partitions are assigned contiguously. This function consequently
creates partition queues for every p <= pid
Returns a count of total number of consumers for this source
Creates a new partition queue for partition_id
.
Fast-forward consumer to specified Kafka Offset. Prints a warning if failed to do so Assumption: if offset does not exist (for instance, because of compaction), will seek to the next available offset
This function polls from the next consumer for which a message is available. This function polls the set round-robin: when a consumer is polled, it is placed at the back of the queue.
If a message has an offset that is smaller than the next expected offset for this consumer (and this partition) we skip this message, and seek to the appropriate offset
Read any statistics JSON blobs generated via the rdkafka statistics callback.
Polls from the next partition queue and returns the message, if any.
We maintain the list of partition queues in a queue, and add queues that we polled from to the end of the queue. We thus swing through all available partition queues in a somewhat fair manner.
Trait Implementations
fn new(
source_name: String,
source_id: SourceInstanceId,
worker_id: usize,
worker_count: usize,
consumer_activator: SyncActivator,
connector: ExternalSourceConnector,
restored_offsets: Vec<(PartitionId, Option<MzOffset>)>,
_: SourceDataEncoding,
logger: Option<Logger>,
_: SourceBaseMetrics
) -> Result<(KafkaSourceReader, Option<PartitionId>), Error>
fn new(
source_name: String,
source_id: SourceInstanceId,
worker_id: usize,
worker_count: usize,
consumer_activator: SyncActivator,
connector: ExternalSourceConnector,
restored_offsets: Vec<(PartitionId, Option<MzOffset>)>,
_: SourceDataEncoding,
logger: Option<Logger>,
_: SourceBaseMetrics
) -> Result<(KafkaSourceReader, Option<PartitionId>), Error>
Create a new instance of a Kafka reader.
Returns the next message available from the source. Read more
Auto Trait Implementations
impl !RefUnwindSafe for KafkaSourceReader
impl !Send for KafkaSourceReader
impl !Sync for KafkaSourceReader
impl Unpin for KafkaSourceReader
impl !UnwindSafe for KafkaSourceReader
Blanket Implementations
Mutably borrows from an owned value. Read more
Attaches the provided Subscriber
to this type, returning a
WithDispatch
wrapper. Read more
Attaches the current default Subscriber
to this type, returning a
WithDispatch
wrapper. Read more