Skip to content

Commit

Permalink
Add internal update_offsets param to consumer poll(); default to new …
Browse files Browse the repository at this point in the history
…iterator
  • Loading branch information
dpkp committed Sep 28, 2019
1 parent 216e623 commit 2f713cf
Showing 1 changed file with 13 additions and 7 deletions.
20 changes: 13 additions & 7 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ class KafkaConsumer(six.Iterator):
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None,
'legacy_iterator': True, # experimental feature
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000

Expand Down Expand Up @@ -598,7 +598,7 @@ def partitions_for_topic(self, topic):
partitions = cluster.partitions_for_topic(topic)
return partitions

def poll(self, timeout_ms=0, max_records=None):
def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
"""Fetch data from assigned topics / partitions.
Records are fetched and returned in batches by topic-partition.
Expand All @@ -622,6 +622,12 @@ def poll(self, timeout_ms=0, max_records=None):
dict: Topic to list of records since the last fetch for the
subscribed list of topics and partitions.
"""
# Note: update_offsets is an internal-use only argument. It is used to
# support the python iterator interface, and which wraps consumer.poll()
# and requires that the partition offsets tracked by the fetcher are not
# updated until the iterator returns each record to the user. As such,
# the argument is not documented and should not be relied on by library
# users to not break in the future.
assert timeout_ms >= 0, 'Timeout must not be negative'
if max_records is None:
max_records = self.config['max_poll_records']
Expand All @@ -632,7 +638,7 @@ def poll(self, timeout_ms=0, max_records=None):
start = time.time()
remaining = timeout_ms
while True:
records = self._poll_once(remaining, max_records)
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
if records:
return records

Expand All @@ -642,7 +648,7 @@ def poll(self, timeout_ms=0, max_records=None):
if remaining <= 0:
return {}

def _poll_once(self, timeout_ms, max_records):
def _poll_once(self, timeout_ms, max_records, update_offsets=True):
"""Do one round of polling. In addition to checking for new data, this does
any needed heart-beating, auto-commits, and offset updates.
Expand All @@ -661,7 +667,7 @@ def _poll_once(self, timeout_ms, max_records):

# If data is available already, e.g. from a previous network client
# poll() call to commit, then just return it immediately
records, partial = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator))
records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
if records:
# Before returning the fetched records, we can send off the
# next round of fetches and avoid block waiting for their
Expand All @@ -681,7 +687,7 @@ def _poll_once(self, timeout_ms, max_records):
if self._coordinator.need_rejoin():
return {}

records, _ = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator))
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
return records

def position(self, partition):
Expand Down Expand Up @@ -1089,7 +1095,7 @@ def _update_fetch_positions(self, partitions):

def _message_generator_v2(self):
timeout_ms = 1000 * (self._consumer_timeout - time.time())
record_map = self.poll(timeout_ms=timeout_ms)
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
for tp, records in six.iteritems(record_map):
# Generators are stateful, and it is possible that the tp / records
# here may become stale during iteration -- i.e., we seek to a
Expand Down

0 comments on commit 2f713cf

Please sign in to comment.