kafka.
KafkaConsumer
(
*topics
,
**configs
)
[source]
¶
Consume records from a Kafka cluster.
The consumer will transparently handle the failure of servers in the Kafka cluster, and adapt as topic-partitions are created or migrate between brokers. It also interacts with the assigned kafka Group Coordinator node to allow multiple consumers to load balance consumption of topics (requires kafka >= 0.9.0.0).
The consumer is not thread safe and should not be shared across threads.
Parameters:
*topics
(
str
) – optional list of topics to subscribe to. If not set,
call
subscribe()
or
assign()
before consuming records.
poll()
. Default: 500
poll()
when using consumer group
management. This places an upper bound on the amount of time that
the consumer can be idle before fetching more records. If
poll()
is not called before expiration
of this timeout, then the consumer is considered failed and the
group will rebalance in order to reassign the partitions to another
member. Default 300000
Specify which Kafka API version to use. If set to None, the client will attempt to infer the broker version by probing various APIs. Different versions enable different functionality.
Examples
Default: None
Warning
It is not possible to use both manual partition assignment with
assign()
and group assignment with
subscribe()
.
This interface does not support incremental assignment and will replace the previous assignment (if there was one).
Manual topic assignment through this method does not use the consumer’s group management functionality. As such, there will be no rebalance operation triggered when group membership or cluster and topic metadata change.
assignment
(
)
[source]
¶
Get the TopicPartitions currently assigned to this consumer.
If partitions were directly assigned using
assign()
, then this will simply return the
same partitions that were previously assigned. If topics were
subscribed using
subscribe()
, then this will
give the set of topic partitions currently assigned to the consumer
(which may be None if the assignment hasn’t happened yet, or if the
partitions are in the process of being reassigned).
beginning_offsets
(
partitions
)
[source]
¶
Get the first offset for the given partitions.
This method does not change the current consumer position of the partitions.
This method may block indefinitely if the partition does not exist.
Raises:
UnsupportedVersionError
– If the broker does not support looking
up the offsets by timestamp.
KafkaTimeoutError
– If fetch failed in request_timeout_ms.
commit
(
offsets=None
)
[source]
¶
Commit offsets to kafka, blocking until success or error.
This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. To avoid re-processing the last message read if a consumer is restarted, the committed offset should be the next message your application should consume, i.e.: last_offset + 1.
Blocks until either the commit succeeds or an unrecoverable error is encountered (in which case it is thrown to the caller).
Currently only supports kafka-topic offset storage (not zookeeper).
Parameters: offsets ( dict , optional ) – {TopicPartition: OffsetAndMetadata} dict to commit with the configured group_id. Defaults to currently consumed offsets for all subscribed partitions.
commit_async
(
offsets=None
,
callback=None
)
[source]
¶
Commit offsets to kafka asynchronously, optionally firing callback.
This commits offsets only to Kafka. The offsets committed using this API will be used on the first fetch after every rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API should not be used. To avoid re-processing the last message read if a consumer is restarted, the committed offset should be the next message your application should consume, i.e.: last_offset + 1.
This is an asynchronous call and will not block. Any errors encountered are either passed to the callback (if provided) or discarded.
Parameters:kafka.future.Future
committed
(
partition
,
metadata=False
)
[source]
¶
Get the last committed offset for the given partition.
This offset will be used as the position for the consumer in the event of a failure.
This call may block to do a remote call if the partition in question isn’t assigned to this consumer or if the consumer hasn’t yet initialized its cache of committed offsets.
Parameters:The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit.
end_offsets
(
partitions
)
[source]
¶
Get the last offset for the given partitions. The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
This method does not change the current consumer position of the partitions.
This method may block indefinitely if the partition does not exist.
Raises:
UnsupportedVersionError
– If the broker does not support looking
up the offsets by timestamp.
KafkaTimeoutError
– If fetch failed in request_timeout_ms
highwater
(
partition
)
[source]
¶
Last known highwater offset for a partition.
A highwater offset is the offset that will be assigned to the next message that is produced. It may be useful for calculating lag, by comparing with the reported position. Note that both position and highwater refer to the next offset – i.e., highwater offset is one greater than the newest available message.
Highwater offsets are returned in FetchResponse messages, so will
not be available if no FetchRequests have been sent for this partition
Parameters:
partition
(
TopicPartition
) – Partition to check
Returns:Offset if available
Return type:int or None
metrics
(
raw=False
)
[source]
¶
Get metrics on consumer performance.
This is ported from the Java Consumer, for details see: https://kafka.apache.org/documentation/#consumer_monitoring
Warning
This is an unstable interface. It may change in future releases without warning.
offsets_for_times
(
timestamps
)
[source]
¶
Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
This is a blocking call. The consumer does not have to be assigned the partitions.
If the message format version in a partition is before 0.10.0, i.e.
the messages do not have timestamps,
None
will be returned for that
partition.
None
will also be returned for the partition if there
are no messages in it.
This method may block indefinitely if the partition does not exist.
Parameters:
timestamps
(
dict
) –
{TopicPartition:
int}
mapping from partition
to the timestamp to look up. Unit should be milliseconds since
beginning of the epoch (midnight Jan 1, 1970 (UTC))
OffsetAndTimestamp}``: mapping from partition to the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp.
Return type:`` {TopicPartition
Raises:
ValueError
– If the target timestamp is negative
UnsupportedVersionError
– If the broker does not support looking
up the offsets by timestamp.
KafkaTimeoutError
– If fetch failed in request_timeout_ms
partitions_for_topic
(
topic
)
[source]
¶
This method first checks the local metadata cache for information about the topic. If the topic is not found (either because the topic does not exist, the user is not authorized to view the topic, or the metadata cache is not populated), then it will issue a metadata update call to the cluster.
pause
(
*partitions
)
[source]
¶
Suspend fetching from the requested partitions.
Future calls to
poll()
will not return any
records from these partitions until they have been resumed using
resume()
.
Note: This method does not affect partition subscription. In particular, it does not cause a group rebalance when automatic assignment is used.
poll
(
timeout_ms=0
,
max_records=None
,
update_offsets=True
)
[source]
¶
Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
On each poll, consumer will try to use the last consumed offset as the
starting offset and fetch sequentially. The last consumed offset can be
manually set through
seek()
or automatically
set as the last committed offset for the subscribed list of partitions.
Incompatible with iterator interface – use one or the other, not both.
Parameters:
poll()
.
Default: Inherit value from max_poll_records.
subscribed list of topics and partitions.
seek
(
partition
,
offset
)
[source]
¶
Manually specify the fetch offset for a TopicPartition.
Overrides the fetch offsets that the consumer will use on the next
poll()
. If this API is invoked for the same
partition more than once, the latest offset will be used on the next
poll()
.
Note: You may lose data if this API is arbitrarily used in the middle of consumption to reset the fetch offsets.
Parameters: *partitions – Optionally provide specific TopicPartitions, otherwise default to all assigned partitions. Raises:
AssertionError
– If any partition is not currently assigned, or if
no partitions are assigned.
Parameters:
*partitions
– Optionally provide specific TopicPartitions, otherwise
default to all assigned partitions.
Raises:
AssertionError
– If any partition is not currently assigned, or if
no partitions are assigned.
subscribe
(
topics=()
,
pattern=None
,
listener=None
)
[source]
¶
Subscribe to a list of topics, or a topic regex pattern.
Partitions will be dynamically assigned via a group coordinator. Topic subscriptions are not incremental: this list will replace the current assignment (if there is one).
This method is incompatible with
assign()
.
Optionally include listener callback, which will be called before and after each rebalance operation.
As part of group management, the consumer will keep track of the list of consumers that belong to a particular group and will trigger a rebalance operation if one of the following events trigger:
When any of these events are triggered, the provided listener will be invoked first to indicate that the consumer’s assignment has been revoked, and then again when the new assignment has been received. Note that this listener will immediately override any listener set in a previous call to subscribe. It is guaranteed, however, that the partitions revoked/assigned through this interface are from topics subscribed in this call.
IllegalStateError
– If called after previously calling
assign()
.
AssertionError
– If neither topics or pattern is provided.
TypeError
– If listener is not a ConsumerRebalanceListener.
topics
(
)
[source]
¶
Get all topics the user is authorized to view. This will always issue a remote call to the cluster to fetch the latest information.