From f599aa4df99a9e9b8c229b3b527e271c110a1c26 Mon Sep 17 00:00:00 2001 From: Varun Chawla Date: Tue, 10 Feb 2026 00:11:50 -0800 Subject: [PATCH 1/2] Use time.monotonic() instead of time.time() for elapsed time calculations Replace all uses of time.time() with time.monotonic() for measuring elapsed time, timeouts, and latency across the library. time.time() returns wall clock time which is subject to NTP adjustments, DST changes, and manual clock changes. This can cause incorrect timeout behavior -- including premature timeouts or infinite hangs -- when the system clock jumps forward or backward. time.monotonic() provides a clock that always moves forward at a steady rate, making it the correct choice for measuring time intervals. The only intentional exception is kafka/record/legacy_records.py, which uses time.time() to generate Kafka message timestamps that require real wall clock time. Fixes #1546 Co-Authored-By: Claude Opus 4.6 --- kafka/admin/client.py | 4 +-- kafka/benchmarks/consumer_performance.py | 4 +-- kafka/benchmarks/producer_performance.py | 4 +-- kafka/client_async.py | 32 ++++++++++++------------ kafka/cluster.py | 6 ++--- kafka/conn.py | 28 ++++++++++----------- kafka/consumer/fetcher.py | 10 ++++---- kafka/consumer/group.py | 6 ++--- kafka/consumer/subscription_state.py | 2 +- kafka/coordinator/base.py | 12 ++++----- kafka/coordinator/consumer.py | 20 +++++++-------- kafka/coordinator/heartbeat.py | 18 ++++++------- kafka/metrics/kafka_metric.py | 2 +- kafka/metrics/stats/sensor.py | 6 ++--- kafka/producer/producer_batch.py | 8 +++--- kafka/producer/record_accumulator.py | 8 +++--- kafka/producer/sender.py | 6 ++--- kafka/util.py | 8 +++--- test/test_client_async.py | 14 +++++------ test/test_conn.py | 8 +++--- test/test_coordinator.py | 2 +- test/test_fetcher.py | 2 +- test/test_metrics.py | 16 ++++++------ test/test_sender.py | 4 +-- test/testutil.py | 4 +-- 25 files changed, 117 insertions(+), 117 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 6b57091d5..0319127b8 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -256,8 +256,8 @@ def _refresh_controller_id(self, timeout_ms=30000): # use defaults for allow_auto_topic_creation / include_authorized_operations in v6+ request = MetadataRequest[version]() - timeout_at = time.time() + timeout_ms / 1000 - while time.time() < timeout_at: + timeout_at = time.monotonic() + timeout_ms / 1000 + while time.monotonic() < timeout_at: response = self.send_request(request) controller_id = response.controller_id if controller_id == -1: diff --git a/kafka/benchmarks/consumer_performance.py b/kafka/benchmarks/consumer_performance.py index e76264145..f1532f346 100644 --- a/kafka/benchmarks/consumer_performance.py +++ b/kafka/benchmarks/consumer_performance.py @@ -52,14 +52,14 @@ def run(args): print('-> OK!') print() - start_time = time.time() + start_time = time.monotonic() records = 0 for msg in consumer: records += 1 if records >= args.num_records: break - end_time = time.time() + end_time = time.monotonic() timer_stop.set() timer.join() print('Consumed {0} records'.format(records)) diff --git a/kafka/benchmarks/producer_performance.py b/kafka/benchmarks/producer_performance.py index df7c51f29..3ab81ffea 100644 --- a/kafka/benchmarks/producer_performance.py +++ b/kafka/benchmarks/producer_performance.py @@ -66,9 +66,9 @@ def _benchmark(): raise ValueError(r) print("%d suceeded, %d failed" % (count_success, count_failure)) - start_time = time.time() + start_time = time.monotonic() _benchmark() - end_time = time.time() + end_time = time.monotonic() timer_stop.set() timer.join() print('Execution time:', end_time - start_time, 'secs') diff --git a/kafka/client_async.py b/kafka/client_async.py index 734efb65b..9a89ba20c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -304,7 +304,7 @@ def _conn_state_change(self, node_id, sock, conn): self._selector.modify(sock, selectors.EVENT_WRITE, conn) if self.cluster.is_bootstrap(node_id): - self._last_bootstrap = time.time() + self._last_bootstrap = time.monotonic() elif conn.state is ConnectionStates.API_VERSIONS_SEND: try: @@ -708,9 +708,9 @@ def _poll(self, timeout): # Send pending requests first, before polling for responses self._register_send_sockets() - start_select = time.time() + start_select = time.monotonic() ready = self._selector.select(timeout) - end_select = time.time() + end_select = time.monotonic() if self._sensors: self._sensors.select_time.record((end_select - start_select) * 1000000000) @@ -782,7 +782,7 @@ def _poll(self, timeout): timeout_ms)) if self._sensors: - self._sensors.io_time.record((time.time() - end_select) * 1000000000) + self._sensors.io_time.record((time.monotonic() - end_select) * 1000000000) self._maybe_close_oldest_connection() @@ -1020,9 +1020,9 @@ def check_version(self, node_id=None, timeout=None, **kwargs): """ timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000) with self._lock: - end = time.time() + timeout - while time.time() < end: - time_remaining = max(end - time.time(), 0) + end = time.monotonic() + timeout + while time.monotonic() < end: + time_remaining = max(end - time.monotonic(), 0) if node_id is not None and self.connection_delay(node_id) > 0: sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0) if sleep_time > 0: @@ -1043,8 +1043,8 @@ def check_version(self, node_id=None, timeout=None, **kwargs): continue conn = self._conns[try_node] - while conn.connecting() and time.time() < end: - timeout_ms = min((end - time.time()) * 1000, 200) + while conn.connecting() and time.monotonic() < end: + timeout_ms = min((end - time.monotonic()) * 1000, 200) self.poll(timeout_ms=timeout_ms) if conn._api_version is not None: @@ -1130,7 +1130,7 @@ def _maybe_close_oldest_connection(self): expired_connection = self._idle_expiry_manager.poll_expired_connection() if expired_connection: conn_id, ts = expired_connection - idle_ms = (time.time() - ts) * 1000 + idle_ms = (time.monotonic() - ts) * 1000 log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms) self.close(node_id=conn_id) @@ -1185,14 +1185,14 @@ def __init__(self, connections_max_idle_ms): else: self.connections_max_idle = float('inf') self.next_idle_close_check_time = None - self.update_next_idle_close_check_time(time.time()) + self.update_next_idle_close_check_time(time.monotonic()) self.lru_connections = collections.OrderedDict() def update(self, conn_id): # order should reflect last-update if conn_id in self.lru_connections: del self.lru_connections[conn_id] - self.lru_connections[conn_id] = time.time() + self.lru_connections[conn_id] = time.monotonic() def remove(self, conn_id): if conn_id in self.lru_connections: @@ -1201,10 +1201,10 @@ def remove(self, conn_id): def is_expired(self, conn_id): if conn_id not in self.lru_connections: return None - return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle + return time.monotonic() >= self.lru_connections[conn_id] + self.connections_max_idle def next_check_ms(self): - now = time.time() + now = time.monotonic() if not self.lru_connections or self.next_idle_close_check_time == float('inf'): return float('inf') elif self.next_idle_close_check_time <= now: @@ -1216,7 +1216,7 @@ def update_next_idle_close_check_time(self, ts): self.next_idle_close_check_time = ts + self.connections_max_idle def poll_expired_connection(self): - if time.time() < self.next_idle_close_check_time: + if time.monotonic() < self.next_idle_close_check_time: return None if not len(self.lru_connections): @@ -1228,7 +1228,7 @@ def poll_expired_connection(self): self.update_next_idle_close_check_time(oldest_ts) - if time.time() >= oldest_ts + self.connections_max_idle: + if time.monotonic() >= oldest_ts + self.connections_max_idle: return (oldest_conn_id, oldest_ts) else: return None diff --git a/kafka/cluster.py b/kafka/cluster.py index 09832fde2..98ddd23da 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -169,7 +169,7 @@ def coordinator_for_group(self, group): def ttl(self): """Milliseconds until metadata should be refreshed""" - now = time.time() * 1000 + now = time.monotonic() * 1000 if self._need_update: ttl = 0 else: @@ -231,7 +231,7 @@ def failed_update(self, exception): self._future = None if f: f.failure(exception) - self._last_refresh_ms = time.time() * 1000 + self._last_refresh_ms = time.monotonic() * 1000 def update_metadata(self, metadata): """Update cluster state given a MetadataResponse. @@ -335,7 +335,7 @@ def update_metadata(self, metadata): self._future = None self._need_update = False - now = time.time() * 1000 + now = time.monotonic() * 1000 self._last_refresh_ms = now self._last_successful_refresh_ms = now diff --git a/kafka/conn.py b/kafka/conn.py index bfabde407..80c2120b8 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -326,7 +326,7 @@ def _next_afi_sockaddr(self): def connect_blocking(self, timeout=float('inf')): if self.connected(): return True - timeout += time.time() + timeout += time.monotonic() # First attempt to perform dns lookup # note that the underlying interface, socket.getaddrinfo, # has no explicit timeout so we may exceed the user-specified timeout @@ -335,7 +335,7 @@ def connect_blocking(self, timeout=float('inf')): # Loop once over all returned dns entries selector = None while self._gai: - while time.time() < timeout: + while time.monotonic() < timeout: self.connect() if self.connected(): if selector is not None: @@ -359,7 +359,7 @@ def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): self.state = ConnectionStates.CONNECTING - self.last_attempt = time.time() + self.last_attempt = time.monotonic() next_lookup = self._next_afi_sockaddr() if not next_lookup: self.close(Errors.KafkaConnectionError('DNS failure')) @@ -464,7 +464,7 @@ def connect(self): ConnectionStates.DISCONNECTED): # Connection timed out request_timeout = self.config['request_timeout_ms'] / 1000.0 - if time.time() > request_timeout + self.last_attempt: + if time.monotonic() > request_timeout + self.last_attempt: log.error('%s: Connection attempt timed out', self) self.close(Errors.KafkaConnectionError('timeout')) return self.state @@ -776,7 +776,7 @@ def _recv_sasl_authenticate(self): if version == 1: ((correlation_id, response),) = self._protocol.receive_bytes(data) (future, timestamp, _timeout) = self.in_flight_requests.pop(correlation_id) - latency_ms = (time.time() - timestamp) * 1000 + latency_ms = (time.monotonic() - timestamp) * 1000 if self._sensors: self._sensors.request_time.record(latency_ms) log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response) @@ -834,7 +834,7 @@ def throttle_delay(self): Return the number of milliseconds to wait until connection is no longer throttled. """ if self._throttle_time is not None: - remaining_ms = (self._throttle_time - time.time()) * 1000 + remaining_ms = (self._throttle_time - time.monotonic()) * 1000 if remaining_ms > 0: return remaining_ms else: @@ -855,7 +855,7 @@ def connection_delay(self): elif self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): return 0 else: - time_waited = time.time() - self.last_attempt + time_waited = time.monotonic() - self.last_attempt return max(self._reconnect_backoff - time_waited, 0) * 1000 else: # When connecting or connected, we should be able to delay @@ -1006,7 +1006,7 @@ def _send(self, request, blocking=True, request_timeout_ms=None): log.debug('%s: Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request) if request.expect_response(): assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!' - sent_time = time.time() + sent_time = time.monotonic() timeout_at = sent_time + (request_timeout_ms / 1000) self.in_flight_requests[correlation_id] = (future, sent_time, timeout_at) else: @@ -1093,7 +1093,7 @@ def _maybe_throttle(self, response): # Client side throttling enabled in v2.0 brokers # prior to that throttling (if present) was managed broker-side if self.config['api_version'] is not None and self.config['api_version'] >= (2, 0): - throttle_time = time.time() + throttle_time_ms / 1000 + throttle_time = time.monotonic() + throttle_time_ms / 1000 self._throttle_time = max(throttle_time, self._throttle_time or 0) log.warning("%s: %s throttled by broker (%d ms)", self, response.__class__.__name__, throttle_time_ms) @@ -1129,7 +1129,7 @@ def recv(self): except KeyError: self.close(Errors.KafkaConnectionError('Received unrecognized correlation id')) return () - latency_ms = (time.time() - timestamp) * 1000 + latency_ms = (time.monotonic() - timestamp) * 1000 if self._sensors: self._sensors.request_time.record(latency_ms) @@ -1193,7 +1193,7 @@ def requests_timed_out(self): return self.next_ifr_request_timeout_ms() == 0 def timed_out_ifrs(self): - now = time.time() + now = time.monotonic() ifrs = sorted(self.in_flight_requests.values(), reverse=True, key=lambda ifr: ifr[2]) return list(filter(lambda ifr: ifr[2] <= now, ifrs)) @@ -1204,7 +1204,7 @@ def get_timeout(v): return v[2] next_timeout = min(map(get_timeout, self.in_flight_requests.values())) - return max(0, (next_timeout - time.time()) * 1000) + return max(0, (next_timeout - time.monotonic()) * 1000) else: return float('inf') @@ -1275,8 +1275,8 @@ def check_version(self, timeout=2, **kwargs): Raises: NodeNotReadyError on timeout """ - timeout_at = time.time() + timeout - if not self.connect_blocking(timeout_at - time.time()): + timeout_at = time.monotonic() + timeout + if not self.connect_blocking(timeout_at - time.monotonic()): raise Errors.NodeNotReadyError() else: return self._api_version diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 8c2a41ba0..d3a600c6f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -147,7 +147,7 @@ def send_fetches(self): log.debug("Sending FetchRequest to node %s", node_id) self._nodes_with_pending_fetch_requests.add(node_id) future = self._client.send(node_id, request, wakeup=False) - future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.time()) + future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.monotonic()) future.add_errback(self._handle_fetch_error, node_id) future.add_both(self._clear_pending_fetch_request, node_id) futures.append(future) @@ -421,13 +421,13 @@ def _reset_offsets_async(self, timestamps): if not self._client.ready(node_id): continue partitions = set(timestamps_and_epochs.keys()) - expire_at = time.time() + self.config['request_timeout_ms'] / 1000 + expire_at = time.monotonic() + self.config['request_timeout_ms'] / 1000 self._subscriptions.set_reset_pending(partitions, expire_at) def on_success(timestamps_and_epochs, result): fetched_offsets, partitions_to_retry = result if partitions_to_retry: - self._subscriptions.reset_failed(partitions_to_retry, time.time() + self.config['retry_backoff_ms'] / 1000) + self._subscriptions.reset_failed(partitions_to_retry, time.monotonic() + self.config['retry_backoff_ms'] / 1000) self._client.cluster.request_update() for partition, offset in fetched_offsets.items(): @@ -435,7 +435,7 @@ def on_success(timestamps_and_epochs, result): self._reset_offset_if_needed(partition, ts, offset.offset) def on_failure(partitions, error): - self._subscriptions.reset_failed(partitions, time.time() + self.config['retry_backoff_ms'] / 1000) + self._subscriptions.reset_failed(partitions, time.monotonic() + self.config['retry_backoff_ms'] / 1000) self._client.cluster.request_update() if not getattr(error, 'retriable', False): @@ -778,7 +778,7 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): self._completed_fetches.append(completed_fetch) if self._sensors: - self._sensors.fetch_latency.record((time.time() - send_time) * 1000) + self._sensors.fetch_latency.record((time.monotonic() - send_time) * 1000) def _handle_fetch_error(self, node_id, exception): level = logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7c8b1cb2f..e3c2672c3 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1176,7 +1176,7 @@ def _update_fetch_positions(self, timeout_ms=None): return not self._fetcher.reset_offsets_if_needed() def _message_generator_v2(self): - timeout_ms = 1000 * max(0, self._consumer_timeout - time.time()) + timeout_ms = 1000 * max(0, self._consumer_timeout - time.monotonic()) record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) for tp, records in record_map.items(): # Generators are stateful, and it is possible that the tp / records @@ -1201,7 +1201,7 @@ def __next__(self): if self._closed: raise StopIteration('KafkaConsumer closed') self._set_consumer_timeout() - while time.time() < self._consumer_timeout: + while time.monotonic() < self._consumer_timeout: if not self._iterator: self._iterator = self._message_generator_v2() try: @@ -1213,5 +1213,5 @@ def __next__(self): def _set_consumer_timeout(self): # consumer_timeout_ms can be used to stop iteration early if self.config['consumer_timeout_ms'] >= 0: - self._consumer_timeout = time.time() + ( + self._consumer_timeout = time.monotonic() + ( self.config['consumer_timeout_ms'] / 1000.0) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 578e70a07..32a2ae4ce 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -462,7 +462,7 @@ def reset(self, strategy): self.next_allowed_retry_time = None def is_reset_allowed(self): - return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.time() + return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.monotonic() @property def awaiting_reset(self): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 12e7ac40b..5658e31cc 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -574,7 +574,7 @@ def _send_join_group_request(self): log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_join_group_response, future, time.time()) + _f.add_callback(self._handle_join_group_response, future, time.monotonic()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future @@ -598,7 +598,7 @@ def _handle_join_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: if self._sensors: - self._sensors.join_latency.record((time.time() - send_time) * 1000) + self._sensors.join_latency.record((time.monotonic() - send_time) * 1000) with self._lock: if self.state is not MemberState.REBALANCING: # if the consumer was woken up before a rebalance completes, @@ -744,7 +744,7 @@ def _send_sync_group_request(self, request): future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_sync_group_response, future, time.time()) + _f.add_callback(self._handle_sync_group_response, future, time.monotonic()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future @@ -754,7 +754,7 @@ def _handle_sync_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: if self._sensors: - self._sensors.sync_latency.record((time.time() - send_time) * 1000) + self._sensors.sync_latency.record((time.monotonic() - send_time) * 1000) future.success(response.member_assignment) return @@ -997,14 +997,14 @@ def _send_heartbeat_request(self): heartbeat_log.debug("Sending HeartbeatRequest to %s: %s", self.coordinator_id, request) future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_heartbeat_response, future, time.time()) + _f.add_callback(self._handle_heartbeat_response, future, time.monotonic()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future def _handle_heartbeat_response(self, future, send_time, response): if self._sensors: - self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000) + self._sensors.heartbeat_latency.record((time.monotonic() - send_time) * 1000) heartbeat_log.debug("Received heartbeat response for group %s: %s", self.group_id, response) error_type = Errors.for_code(response.error_code) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 58869d61a..25da9fe1b 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -127,7 +127,7 @@ def __init__(self, client, subscription, **configs): log.warning('group_id is None: disabling auto-commit.') self.config['enable_auto_commit'] = False else: - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval + self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval if self.config['metrics']: self._consumer_sensors = ConsumerCoordinatorMetrics( @@ -250,7 +250,7 @@ def _on_join_complete(self, generation, member_id, protocol, assignor.on_generation_assignment(generation) # reschedule the auto commit starting from now - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval + self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval assigned = set(self._subscription.assigned_partitions()) log.info("Setting newly assigned partitions %s for group %s", @@ -321,10 +321,10 @@ def time_to_next_poll(self): if not self.config['enable_auto_commit']: return self.time_to_next_heartbeat() - if time.time() > self.next_auto_commit_deadline: + if time.monotonic() > self.next_auto_commit_deadline: return 0 - return min(self.next_auto_commit_deadline - time.time(), + return min(self.next_auto_commit_deadline - time.monotonic(), self.time_to_next_heartbeat()) def _perform_assignment(self, leader_id, assignment_strategy, members): @@ -753,7 +753,7 @@ def _send_offset_commit_request(self, offsets): future = Future() _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) + _f.add_callback(self._handle_offset_commit_response, offsets, future, time.monotonic()) _f.add_errback(self._failed_request, node_id, request, future) return future @@ -761,7 +761,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): log.debug("Received OffsetCommitResponse: %s", response) # TODO look at adding request_latency_ms to response (like java kafka) if self._consumer_sensors: - self._consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) + self._consumer_sensors.commit_latency.record((time.monotonic() - send_time) * 1000) unauthorized_topics = set() for topic, partitions in response.topics: @@ -961,15 +961,15 @@ def _default_offset_commit_callback(self, offsets, res_or_exc): def _commit_offsets_async_on_complete(self, offsets, res_or_exc): if isinstance(res_or_exc, Exception) and getattr(res_or_exc, 'retriable', False): - self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) + self.next_auto_commit_deadline = min(time.monotonic() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) self.config['default_offset_commit_callback'](offsets, res_or_exc) def _maybe_auto_commit_offsets_async(self): if self.config['enable_auto_commit']: if self.coordinator_unknown(): - self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000 - elif time.time() > self.next_auto_commit_deadline: - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval + self.next_auto_commit_deadline = time.monotonic() + self.config['retry_backoff_ms'] / 1000 + elif time.monotonic() > self.next_auto_commit_deadline: + self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval self._do_auto_commit_offsets_async() def maybe_auto_commit_offsets_now(self): diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index fb40c302b..4bf04d4ee 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -34,25 +34,25 @@ def __init__(self, **configs): self.last_send = -1 * float('inf') self.last_receive = -1 * float('inf') self.last_poll = -1 * float('inf') - self.last_reset = time.time() + self.last_reset = time.monotonic() self.heartbeat_failed = None def poll(self): - self.last_poll = time.time() + self.last_poll = time.monotonic() def sent_heartbeat(self): - self.last_send = time.time() + self.last_send = time.monotonic() self.heartbeat_failed = False def fail_heartbeat(self): self.heartbeat_failed = True def received_heartbeat(self): - self.last_receive = time.time() + self.last_receive = time.monotonic() def time_to_next_heartbeat(self): """Returns seconds (float) remaining before next heartbeat should be sent""" - time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset) + time_since_last_heartbeat = time.monotonic() - max(self.last_send, self.last_reset) if self.heartbeat_failed: delay_to_next_heartbeat = self.config['retry_backoff_ms'] / 1000 else: @@ -64,15 +64,15 @@ def should_heartbeat(self): def session_timeout_expired(self): last_recv = max(self.last_receive, self.last_reset) - return (time.time() - last_recv) > (self.config['session_timeout_ms'] / 1000) + return (time.monotonic() - last_recv) > (self.config['session_timeout_ms'] / 1000) def reset_timeouts(self): - self.last_reset = time.time() - self.last_poll = time.time() + self.last_reset = time.monotonic() + self.last_poll = time.monotonic() self.heartbeat_failed = False def poll_timeout_expired(self): - return (time.time() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000) + return (time.monotonic() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000) def __str__(self): return (" + return ((time.monotonic() * 1000 - self._last_record_time) > self._inactive_sensor_expiration_time_ms) diff --git a/kafka/producer/producer_batch.py b/kafka/producer/producer_batch.py index 71dea4fbd..85c427aa9 100644 --- a/kafka/producer/producer_batch.py +++ b/kafka/producer/producer_batch.py @@ -17,7 +17,7 @@ class FinalState(IntEnum): class ProducerBatch(object): def __init__(self, tp, records, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now self.max_record_size = 0 self.created = now self.drained = None @@ -55,7 +55,7 @@ def try_append(self, timestamp_ms, key, value, headers, now=None): if metadata is None: return None - now = time.time() if now is None else now + now = time.monotonic() if now is None else now self.max_record_size = max(self.max_record_size, metadata.size) self.last_append = now future = FutureRecordMetadata( @@ -151,14 +151,14 @@ def _complete_future(self, base_offset, timestamp_ms, record_exceptions_fn): self.produce_future.success((base_offset, timestamp_ms, record_exceptions_fn)) def has_reached_delivery_timeout(self, delivery_timeout_ms, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now return delivery_timeout_ms / 1000 <= now - self.created def in_retry(self): return self._retry def retry(self, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now self._retry = True self.attempts += 1 self.last_attempt = now diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index e779b1668..ccff983b3 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -129,7 +129,7 @@ def append(self, tp, timestamp_ms, key, value, headers, now=None): """ assert isinstance(tp, TopicPartition), 'not TopicPartition' assert not self._closed, 'RecordAccumulator is closed' - now = time.time() if now is None else now + now = time.monotonic() if now is None else now # We keep track of the number of appending thread to make sure we do # not miss batches in abortIncompleteBatches(). self._appends_in_progress.increment() @@ -250,7 +250,7 @@ def ready(self, cluster, now=None): ready_nodes = set() next_ready_check = 9999999.99 unknown_leaders_exist = False - now = time.time() if now is None else now + now = time.monotonic() if now is None else now # several threads are accessing self._batches -- to simplify # concurrent access, we iterate over a snapshot of partitions @@ -316,7 +316,7 @@ def _should_stop_drain_batches_for_partition(self, first, tp): return False def drain_batches_for_one_node(self, cluster, node_id, max_size, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now size = 0 ready = [] partitions = list(cluster.partitions_for_broker(node_id)) @@ -405,7 +405,7 @@ def drain(self, cluster, nodes, max_size, now=None): if not nodes: return {} - now = time.time() if now is None else now + now = time.monotonic() if now is None else now batches = {} for node_id in nodes: batches[node_id] = self.drain_batches_for_one_node(cluster, node_id, max_size, now=now) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 7df76ea24..1463ec8ba 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -177,7 +177,7 @@ def run_once(self): self._client.poll(timeout_ms=poll_timeout_ms) def _send_producer_data(self, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now # get the list of partitions with data ready to send result = self._accumulator.ready(self._metadata, now=now) ready_nodes, next_ready_check_delay, unknown_leaders_exist = result @@ -232,7 +232,7 @@ def _send_producer_data(self, now=None): for expired_batch in expired_batches: error_message = "Expiring %d record(s) for %s: %s ms has passed since batch creation" % ( expired_batch.record_count, expired_batch.topic_partition, - int((time.time() - expired_batch.created) * 1000)) + int((time.monotonic() - expired_batch.created) * 1000)) self._fail_batch(expired_batch, PartitionResponse(error=Errors.KafkaTimeoutError, error_message=error_message)) if self._sensors: @@ -273,7 +273,7 @@ def _send_producer_data(self, now=None): log.debug('%s: Sending Produce Request: %r', str(self), request) (self._client.send(node_id, request, wakeup=False) .add_callback( - self._handle_produce_response, node_id, time.time(), batches) + self._handle_produce_response, node_id, time.monotonic(), batches) .add_errback( self._failed_produce, batches, node_id)) return poll_timeout_ms diff --git a/kafka/util.py b/kafka/util.py index 5c7dd927c..6ebcc4f95 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -12,7 +12,7 @@ class Timer: def __init__(self, timeout_ms, error_message=None, start_at=None): self._timeout_ms = timeout_ms - self._start_at = start_at or time.time() + self._start_at = start_at or time.monotonic() if timeout_ms is not None: self._expire_at = self._start_at + timeout_ms / 1000 else: @@ -21,7 +21,7 @@ def __init__(self, timeout_ms, error_message=None, start_at=None): @property def expired(self): - return time.time() >= self._expire_at + return time.monotonic() >= self._expire_at @property def timeout_ms(self): @@ -29,7 +29,7 @@ def timeout_ms(self): return None elif self._expire_at == float('inf'): return float('inf') - remaining = self._expire_at - time.time() + remaining = self._expire_at - time.monotonic() if remaining < 0: return 0 else: @@ -37,7 +37,7 @@ def timeout_ms(self): @property def elapsed_ms(self): - return int(1000 * (time.time() - self._start_at)) + return int(1000 * (time.monotonic() - self._start_at)) def maybe_raise(self): if self.expired: diff --git a/test/test_client_async.py b/test/test_client_async.py index b4811d346..f37906074 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -233,8 +233,8 @@ def test_send(client_selector_mocked, conn): def test_poll(mocker, client_poll_mocked): metadata = mocker.patch.object(client_poll_mocked, '_maybe_refresh_metadata') ifr_request_timeout = mocker.patch.object(client_poll_mocked, '_next_ifr_request_timeout_ms') - now = time.time() - t = mocker.patch('time.time') + now = time.monotonic() + t = mocker.patch('time.monotonic') t.return_value = now # metadata timeout wins @@ -302,8 +302,8 @@ def test_maybe_refresh_metadata_ttl(client_poll_mocked): def test_maybe_refresh_metadata_backoff(mocker, client_poll_mocked): mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value=None) mocker.patch.object(client_poll_mocked, 'least_loaded_node_refresh_ms', return_value=4321) - now = time.time() - t = mocker.patch('time.time') + now = time.monotonic() + t = mocker.patch('time.monotonic') t.return_value = now client_poll_mocked.poll(timeout_ms=12345678) @@ -336,8 +336,8 @@ def test_maybe_refresh_metadata_cant_send(mocker, client_poll_mocked): mocker.patch.object(client_poll_mocked, '_can_connect', return_value=True) mocker.patch.object(client_poll_mocked, '_init_connect', return_value=True) - now = time.time() - t = mocker.patch('time.time') + now = time.monotonic() + t = mocker.patch('time.monotonic') t.return_value = now # first poll attempts connection @@ -363,7 +363,7 @@ def test_unschedule(): def test_idle_connection_manager(mocker): - t = mocker.patch.object(time, 'time') + t = mocker.patch.object(time, 'monotonic') t.return_value = 0 idle = IdleConnectionManager(100) diff --git a/test/test_conn.py b/test/test_conn.py index d8db22966..a29842f63 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -122,7 +122,7 @@ def test_connect_timeout(_socket, conn): def test_blacked_out(conn): - with mock.patch("time.time", return_value=1000): + with mock.patch("time.monotonic", return_value=1000): conn.last_attempt = 0 assert conn.blacked_out() is False conn.last_attempt = 1000 @@ -131,7 +131,7 @@ def test_blacked_out(conn): def test_connection_delay(conn, mocker): mocker.patch.object(conn, '_reconnect_jitter_pct', return_value=1.0) - with mock.patch("time.time", return_value=1000): + with mock.patch("time.monotonic", return_value=1000): conn.last_attempt = 1000 assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] conn.state = ConnectionStates.CONNECTING @@ -385,7 +385,7 @@ def test_relookup_on_failure(): def test_requests_timed_out(conn): - with mock.patch("time.time", return_value=0): + with mock.patch("time.monotonic", return_value=0): # No in-flight requests, not timed out assert not conn.requests_timed_out() @@ -415,7 +415,7 @@ def test_maybe_throttle(conn): conn._maybe_throttle(HeartbeatResponse[0](error_code=0)) assert not conn.throttled() - with mock.patch("time.time", return_value=1000) as time: + with mock.patch("time.monotonic", return_value=1000) as time: # server-side throttling in v1.0 conn.config['api_version'] = (1, 0) conn._maybe_throttle(HeartbeatResponse[1](throttle_time_ms=1000, error_code=0)) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 3032a4973..542a7b313 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -531,7 +531,7 @@ def test_send_offset_commit_request_success(mocker, patched_coord, offsets): def test_handle_offset_commit_response(mocker, patched_coord, offsets, response, error, dead): future = Future() - patched_coord._handle_offset_commit_response(offsets, future, time.time(), + patched_coord._handle_offset_commit_response(offsets, future, time.monotonic(), response) assert isinstance(future.exception, error) if error else True assert patched_coord.coordinator_id is (None if dead else 0) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 71c726bc8..9e4927c9c 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -422,7 +422,7 @@ def test_fetched_records(fetcher, topic, mocker): ]) def test__handle_fetch_response(fetcher, fetch_offsets, fetch_response, num_partitions): fetcher._nodes_with_pending_fetch_requests.add(0) - fetcher._handle_fetch_response(0, fetch_offsets, time.time(), fetch_response) + fetcher._handle_fetch_response(0, fetch_offsets, time.monotonic(), fetch_response) assert len(fetcher._completed_fetches) == num_partitions diff --git a/test/test_metrics.py b/test/test_metrics.py index 07c0e838a..be909d308 100644 --- a/test/test_metrics.py +++ b/test/test_metrics.py @@ -66,7 +66,7 @@ def test_MetricName(): def test_simple_stats(mocker, time_keeper, metrics): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) config = metrics._config measurable = ConstantMeasurable() @@ -215,7 +215,7 @@ def test_remove_sensor(metrics): def test_remove_inactive_metrics(mocker, time_keeper, metrics): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) s1 = metrics.sensor('test.s1', None, 1) s1.add(metrics.metric_name('test.s1.count', 'grp1'), Count()) @@ -289,7 +289,7 @@ def test_remove_metric(metrics): def test_event_windowing(mocker, time_keeper): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) count = Count() config = MetricConfig(event_window=1, samples=2) @@ -301,7 +301,7 @@ def test_event_windowing(mocker, time_keeper): def test_time_windowing(mocker, time_keeper): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) count = Count() config = MetricConfig(time_window_ms=1, samples=2) @@ -315,7 +315,7 @@ def test_time_windowing(mocker, time_keeper): def test_old_data_has_no_effect(mocker, time_keeper): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) max_stat = Max() min_stat = Min() @@ -401,7 +401,7 @@ def test_Percentiles(metrics): assert p75.value() < 1.0 def test_rate_windowing(mocker, time_keeper, metrics): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) # Use the default time window. Set 3 samples config = MetricConfig(samples=3) @@ -425,7 +425,7 @@ def test_rate_windowing(mocker, time_keeper, metrics): kafka_metric = metrics.metrics.get(metrics.metric_name('test.rate', 'grp1')) assert abs((sum_val / elapsed_secs) - kafka_metric.value()) < EPS, \ 'Rate(0...2) = 2.666' - assert abs(elapsed_secs - (kafka_metric.measurable.window_size(config, time.time() * 1000) / 1000.0)) \ + assert abs(elapsed_secs - (kafka_metric.measurable.window_size(config, time.monotonic() * 1000) / 1000.0)) \ < EPS, 'Elapsed Time = 75 seconds' @@ -469,7 +469,7 @@ class TimeKeeper(object): A clock that you can manually advance by calling sleep """ def __init__(self, auto_tick_ms=0): - self._millis = time.time() * 1000 + self._millis = time.monotonic() * 1000 self._auto_tick_ms = auto_tick_ms def time(self): diff --git a/test/test_sender.py b/test/test_sender.py index 72b8f9a56..4f44112e3 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -125,7 +125,7 @@ def test_complete_batch_transaction(sender, transaction_manager): (Errors.TransactionalIdAuthorizationFailedError, False), ]) def test_complete_batch_error(sender, error, refresh_metadata): - sender._client.cluster._last_successful_refresh_ms = (time.time() - 10) * 1000 + sender._client.cluster._last_successful_refresh_ms = (time.monotonic() - 10) * 1000 sender._client.cluster._need_update = False sender.config['retries'] = 0 assert sender._client.cluster.ttl() > 0 @@ -243,7 +243,7 @@ def test_run_once(): def test__send_producer_data_expiry_time_reset(sender, accumulator, mocker): - now = time.time() + now = time.monotonic() tp = TopicPartition('foo', 0) mocker.patch.object(sender, '_failed_produce') result = accumulator.append(tp, 0, b'key', b'value', [], now=now) diff --git a/test/testutil.py b/test/testutil.py index 1e1f8e198..48459c2c8 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -52,9 +52,9 @@ def maybe_skip_unsupported_compression(compression_type): class Timer(object): def __enter__(self): - self.start = time.time() + self.start = time.monotonic() return self def __exit__(self, *args): - self.end = time.time() + self.end = time.monotonic() self.interval = self.end - self.start From 0110e11315ef43027383814fbf5dcee34033d787 Mon Sep 17 00:00:00 2001 From: Varun Chawla Date: Fri, 27 Feb 2026 22:11:50 -0800 Subject: [PATCH 2/2] Update integration test to use time.monotonic() for heartbeat comparisons The test_heartbeat_thread and test_group integration tests were comparing heartbeat timestamps (which now use time.monotonic()) against time.time() values. Since these two clocks have different epochs, the comparisons fail on CI. Switch the tests to use time.monotonic() to match the source code changes. --- test/integration/test_consumer_group.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/integration/test_consumer_group.py b/test/integration/test_consumer_group.py index c0b4857a1..2dd4f1093 100644 --- a/test/integration/test_consumer_group.py +++ b/test/integration/test_consumer_group.py @@ -73,9 +73,9 @@ def consumer_thread(i): threads[i] = t try: - timeout = time.time() + 15 + timeout = time.monotonic() + 15 while True: - assert time.time() < timeout, "timeout waiting for assignments" + assert time.monotonic() < timeout, "timeout waiting for assignments" # Verify all consumers have been created missing_consumers = set(consumers.keys()) - set(range(num_consumers)) if missing_consumers: @@ -128,7 +128,7 @@ def consumer_thread(i): while True: for c in range(num_consumers): heartbeat = consumers[c]._coordinator.heartbeat - last_hb = time.time() - 0.5 + last_hb = time.monotonic() - 0.5 if (heartbeat.heartbeat_failed or heartbeat.last_receive < last_hb or heartbeat.last_reset > last_hb): @@ -176,7 +176,7 @@ def test_heartbeat_thread(kafka_broker, topic): heartbeat_interval_ms=500) # poll until we have joined group / have assignment - start = time.time() + start = time.monotonic() while not consumer.assignment(): consumer.poll(timeout_ms=100) @@ -193,9 +193,9 @@ def test_heartbeat_thread(kafka_broker, topic): assert last_send > start assert last_recv > start - timeout = time.time() + 30 + timeout = time.monotonic() + 30 while True: - if time.time() > timeout: + if time.monotonic() > timeout: raise RuntimeError('timeout waiting for heartbeat') if consumer._coordinator.heartbeat.last_receive > last_recv: break