diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 6b57091d5..0319127b8 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -256,8 +256,8 @@ def _refresh_controller_id(self, timeout_ms=30000): # use defaults for allow_auto_topic_creation / include_authorized_operations in v6+ request = MetadataRequest[version]() - timeout_at = time.time() + timeout_ms / 1000 - while time.time() < timeout_at: + timeout_at = time.monotonic() + timeout_ms / 1000 + while time.monotonic() < timeout_at: response = self.send_request(request) controller_id = response.controller_id if controller_id == -1: diff --git a/kafka/benchmarks/consumer_performance.py b/kafka/benchmarks/consumer_performance.py index e76264145..f1532f346 100644 --- a/kafka/benchmarks/consumer_performance.py +++ b/kafka/benchmarks/consumer_performance.py @@ -52,14 +52,14 @@ def run(args): print('-> OK!') print() - start_time = time.time() + start_time = time.monotonic() records = 0 for msg in consumer: records += 1 if records >= args.num_records: break - end_time = time.time() + end_time = time.monotonic() timer_stop.set() timer.join() print('Consumed {0} records'.format(records)) diff --git a/kafka/benchmarks/producer_performance.py b/kafka/benchmarks/producer_performance.py index df7c51f29..3ab81ffea 100644 --- a/kafka/benchmarks/producer_performance.py +++ b/kafka/benchmarks/producer_performance.py @@ -66,9 +66,9 @@ def _benchmark(): raise ValueError(r) print("%d suceeded, %d failed" % (count_success, count_failure)) - start_time = time.time() + start_time = time.monotonic() _benchmark() - end_time = time.time() + end_time = time.monotonic() timer_stop.set() timer.join() print('Execution time:', end_time - start_time, 'secs') diff --git a/kafka/client_async.py b/kafka/client_async.py index 734efb65b..9a89ba20c 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -304,7 +304,7 @@ def _conn_state_change(self, node_id, sock, conn): self._selector.modify(sock, selectors.EVENT_WRITE, conn) if self.cluster.is_bootstrap(node_id): - self._last_bootstrap = time.time() + self._last_bootstrap = time.monotonic() elif conn.state is ConnectionStates.API_VERSIONS_SEND: try: @@ -708,9 +708,9 @@ def _poll(self, timeout): # Send pending requests first, before polling for responses self._register_send_sockets() - start_select = time.time() + start_select = time.monotonic() ready = self._selector.select(timeout) - end_select = time.time() + end_select = time.monotonic() if self._sensors: self._sensors.select_time.record((end_select - start_select) * 1000000000) @@ -782,7 +782,7 @@ def _poll(self, timeout): timeout_ms)) if self._sensors: - self._sensors.io_time.record((time.time() - end_select) * 1000000000) + self._sensors.io_time.record((time.monotonic() - end_select) * 1000000000) self._maybe_close_oldest_connection() @@ -1020,9 +1020,9 @@ def check_version(self, node_id=None, timeout=None, **kwargs): """ timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000) with self._lock: - end = time.time() + timeout - while time.time() < end: - time_remaining = max(end - time.time(), 0) + end = time.monotonic() + timeout + while time.monotonic() < end: + time_remaining = max(end - time.monotonic(), 0) if node_id is not None and self.connection_delay(node_id) > 0: sleep_time = min(time_remaining, self.connection_delay(node_id) / 1000.0) if sleep_time > 0: @@ -1043,8 +1043,8 @@ def check_version(self, node_id=None, timeout=None, **kwargs): continue conn = self._conns[try_node] - while conn.connecting() and time.time() < end: - timeout_ms = min((end - time.time()) * 1000, 200) + while conn.connecting() and time.monotonic() < end: + timeout_ms = min((end - time.monotonic()) * 1000, 200) self.poll(timeout_ms=timeout_ms) if conn._api_version is not None: @@ -1130,7 +1130,7 @@ def _maybe_close_oldest_connection(self): expired_connection = self._idle_expiry_manager.poll_expired_connection() if expired_connection: conn_id, ts = expired_connection - idle_ms = (time.time() - ts) * 1000 + idle_ms = (time.monotonic() - ts) * 1000 log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms) self.close(node_id=conn_id) @@ -1185,14 +1185,14 @@ def __init__(self, connections_max_idle_ms): else: self.connections_max_idle = float('inf') self.next_idle_close_check_time = None - self.update_next_idle_close_check_time(time.time()) + self.update_next_idle_close_check_time(time.monotonic()) self.lru_connections = collections.OrderedDict() def update(self, conn_id): # order should reflect last-update if conn_id in self.lru_connections: del self.lru_connections[conn_id] - self.lru_connections[conn_id] = time.time() + self.lru_connections[conn_id] = time.monotonic() def remove(self, conn_id): if conn_id in self.lru_connections: @@ -1201,10 +1201,10 @@ def remove(self, conn_id): def is_expired(self, conn_id): if conn_id not in self.lru_connections: return None - return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle + return time.monotonic() >= self.lru_connections[conn_id] + self.connections_max_idle def next_check_ms(self): - now = time.time() + now = time.monotonic() if not self.lru_connections or self.next_idle_close_check_time == float('inf'): return float('inf') elif self.next_idle_close_check_time <= now: @@ -1216,7 +1216,7 @@ def update_next_idle_close_check_time(self, ts): self.next_idle_close_check_time = ts + self.connections_max_idle def poll_expired_connection(self): - if time.time() < self.next_idle_close_check_time: + if time.monotonic() < self.next_idle_close_check_time: return None if not len(self.lru_connections): @@ -1228,7 +1228,7 @@ def poll_expired_connection(self): self.update_next_idle_close_check_time(oldest_ts) - if time.time() >= oldest_ts + self.connections_max_idle: + if time.monotonic() >= oldest_ts + self.connections_max_idle: return (oldest_conn_id, oldest_ts) else: return None diff --git a/kafka/cluster.py b/kafka/cluster.py index 09832fde2..98ddd23da 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -169,7 +169,7 @@ def coordinator_for_group(self, group): def ttl(self): """Milliseconds until metadata should be refreshed""" - now = time.time() * 1000 + now = time.monotonic() * 1000 if self._need_update: ttl = 0 else: @@ -231,7 +231,7 @@ def failed_update(self, exception): self._future = None if f: f.failure(exception) - self._last_refresh_ms = time.time() * 1000 + self._last_refresh_ms = time.monotonic() * 1000 def update_metadata(self, metadata): """Update cluster state given a MetadataResponse. @@ -335,7 +335,7 @@ def update_metadata(self, metadata): self._future = None self._need_update = False - now = time.time() * 1000 + now = time.monotonic() * 1000 self._last_refresh_ms = now self._last_successful_refresh_ms = now diff --git a/kafka/conn.py b/kafka/conn.py index bfabde407..80c2120b8 100755 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -326,7 +326,7 @@ def _next_afi_sockaddr(self): def connect_blocking(self, timeout=float('inf')): if self.connected(): return True - timeout += time.time() + timeout += time.monotonic() # First attempt to perform dns lookup # note that the underlying interface, socket.getaddrinfo, # has no explicit timeout so we may exceed the user-specified timeout @@ -335,7 +335,7 @@ def connect_blocking(self, timeout=float('inf')): # Loop once over all returned dns entries selector = None while self._gai: - while time.time() < timeout: + while time.monotonic() < timeout: self.connect() if self.connected(): if selector is not None: @@ -359,7 +359,7 @@ def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): self.state = ConnectionStates.CONNECTING - self.last_attempt = time.time() + self.last_attempt = time.monotonic() next_lookup = self._next_afi_sockaddr() if not next_lookup: self.close(Errors.KafkaConnectionError('DNS failure')) @@ -464,7 +464,7 @@ def connect(self): ConnectionStates.DISCONNECTED): # Connection timed out request_timeout = self.config['request_timeout_ms'] / 1000.0 - if time.time() > request_timeout + self.last_attempt: + if time.monotonic() > request_timeout + self.last_attempt: log.error('%s: Connection attempt timed out', self) self.close(Errors.KafkaConnectionError('timeout')) return self.state @@ -776,7 +776,7 @@ def _recv_sasl_authenticate(self): if version == 1: ((correlation_id, response),) = self._protocol.receive_bytes(data) (future, timestamp, _timeout) = self.in_flight_requests.pop(correlation_id) - latency_ms = (time.time() - timestamp) * 1000 + latency_ms = (time.monotonic() - timestamp) * 1000 if self._sensors: self._sensors.request_time.record(latency_ms) log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response) @@ -834,7 +834,7 @@ def throttle_delay(self): Return the number of milliseconds to wait until connection is no longer throttled. """ if self._throttle_time is not None: - remaining_ms = (self._throttle_time - time.time()) * 1000 + remaining_ms = (self._throttle_time - time.monotonic()) * 1000 if remaining_ms > 0: return remaining_ms else: @@ -855,7 +855,7 @@ def connection_delay(self): elif self.config["socks5_proxy"] and Socks5Wrapper.use_remote_lookup(self.config["socks5_proxy"]): return 0 else: - time_waited = time.time() - self.last_attempt + time_waited = time.monotonic() - self.last_attempt return max(self._reconnect_backoff - time_waited, 0) * 1000 else: # When connecting or connected, we should be able to delay @@ -1006,7 +1006,7 @@ def _send(self, request, blocking=True, request_timeout_ms=None): log.debug('%s: Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request) if request.expect_response(): assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!' - sent_time = time.time() + sent_time = time.monotonic() timeout_at = sent_time + (request_timeout_ms / 1000) self.in_flight_requests[correlation_id] = (future, sent_time, timeout_at) else: @@ -1093,7 +1093,7 @@ def _maybe_throttle(self, response): # Client side throttling enabled in v2.0 brokers # prior to that throttling (if present) was managed broker-side if self.config['api_version'] is not None and self.config['api_version'] >= (2, 0): - throttle_time = time.time() + throttle_time_ms / 1000 + throttle_time = time.monotonic() + throttle_time_ms / 1000 self._throttle_time = max(throttle_time, self._throttle_time or 0) log.warning("%s: %s throttled by broker (%d ms)", self, response.__class__.__name__, throttle_time_ms) @@ -1129,7 +1129,7 @@ def recv(self): except KeyError: self.close(Errors.KafkaConnectionError('Received unrecognized correlation id')) return () - latency_ms = (time.time() - timestamp) * 1000 + latency_ms = (time.monotonic() - timestamp) * 1000 if self._sensors: self._sensors.request_time.record(latency_ms) @@ -1193,7 +1193,7 @@ def requests_timed_out(self): return self.next_ifr_request_timeout_ms() == 0 def timed_out_ifrs(self): - now = time.time() + now = time.monotonic() ifrs = sorted(self.in_flight_requests.values(), reverse=True, key=lambda ifr: ifr[2]) return list(filter(lambda ifr: ifr[2] <= now, ifrs)) @@ -1204,7 +1204,7 @@ def get_timeout(v): return v[2] next_timeout = min(map(get_timeout, self.in_flight_requests.values())) - return max(0, (next_timeout - time.time()) * 1000) + return max(0, (next_timeout - time.monotonic()) * 1000) else: return float('inf') @@ -1275,8 +1275,8 @@ def check_version(self, timeout=2, **kwargs): Raises: NodeNotReadyError on timeout """ - timeout_at = time.time() + timeout - if not self.connect_blocking(timeout_at - time.time()): + timeout_at = time.monotonic() + timeout + if not self.connect_blocking(timeout_at - time.monotonic()): raise Errors.NodeNotReadyError() else: return self._api_version diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 8c2a41ba0..d3a600c6f 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -147,7 +147,7 @@ def send_fetches(self): log.debug("Sending FetchRequest to node %s", node_id) self._nodes_with_pending_fetch_requests.add(node_id) future = self._client.send(node_id, request, wakeup=False) - future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.time()) + future.add_callback(self._handle_fetch_response, node_id, fetch_offsets, time.monotonic()) future.add_errback(self._handle_fetch_error, node_id) future.add_both(self._clear_pending_fetch_request, node_id) futures.append(future) @@ -421,13 +421,13 @@ def _reset_offsets_async(self, timestamps): if not self._client.ready(node_id): continue partitions = set(timestamps_and_epochs.keys()) - expire_at = time.time() + self.config['request_timeout_ms'] / 1000 + expire_at = time.monotonic() + self.config['request_timeout_ms'] / 1000 self._subscriptions.set_reset_pending(partitions, expire_at) def on_success(timestamps_and_epochs, result): fetched_offsets, partitions_to_retry = result if partitions_to_retry: - self._subscriptions.reset_failed(partitions_to_retry, time.time() + self.config['retry_backoff_ms'] / 1000) + self._subscriptions.reset_failed(partitions_to_retry, time.monotonic() + self.config['retry_backoff_ms'] / 1000) self._client.cluster.request_update() for partition, offset in fetched_offsets.items(): @@ -435,7 +435,7 @@ def on_success(timestamps_and_epochs, result): self._reset_offset_if_needed(partition, ts, offset.offset) def on_failure(partitions, error): - self._subscriptions.reset_failed(partitions, time.time() + self.config['retry_backoff_ms'] / 1000) + self._subscriptions.reset_failed(partitions, time.monotonic() + self.config['retry_backoff_ms'] / 1000) self._client.cluster.request_update() if not getattr(error, 'retriable', False): @@ -778,7 +778,7 @@ def _handle_fetch_response(self, node_id, fetch_offsets, send_time, response): self._completed_fetches.append(completed_fetch) if self._sensors: - self._sensors.fetch_latency.record((time.time() - send_time) * 1000) + self._sensors.fetch_latency.record((time.monotonic() - send_time) * 1000) def _handle_fetch_error(self, node_id, exception): level = logging.INFO if isinstance(exception, Errors.Cancelled) else logging.ERROR diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 7c8b1cb2f..e3c2672c3 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -1176,7 +1176,7 @@ def _update_fetch_positions(self, timeout_ms=None): return not self._fetcher.reset_offsets_if_needed() def _message_generator_v2(self): - timeout_ms = 1000 * max(0, self._consumer_timeout - time.time()) + timeout_ms = 1000 * max(0, self._consumer_timeout - time.monotonic()) record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False) for tp, records in record_map.items(): # Generators are stateful, and it is possible that the tp / records @@ -1201,7 +1201,7 @@ def __next__(self): if self._closed: raise StopIteration('KafkaConsumer closed') self._set_consumer_timeout() - while time.time() < self._consumer_timeout: + while time.monotonic() < self._consumer_timeout: if not self._iterator: self._iterator = self._message_generator_v2() try: @@ -1213,5 +1213,5 @@ def __next__(self): def _set_consumer_timeout(self): # consumer_timeout_ms can be used to stop iteration early if self.config['consumer_timeout_ms'] >= 0: - self._consumer_timeout = time.time() + ( + self._consumer_timeout = time.monotonic() + ( self.config['consumer_timeout_ms'] / 1000.0) diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py index 578e70a07..32a2ae4ce 100644 --- a/kafka/consumer/subscription_state.py +++ b/kafka/consumer/subscription_state.py @@ -462,7 +462,7 @@ def reset(self, strategy): self.next_allowed_retry_time = None def is_reset_allowed(self): - return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.time() + return self.next_allowed_retry_time is None or self.next_allowed_retry_time < time.monotonic() @property def awaiting_reset(self): diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 12e7ac40b..5658e31cc 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -574,7 +574,7 @@ def _send_join_group_request(self): log.debug("Sending JoinGroup (%s) to coordinator %s", request, self.coordinator_id) future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_join_group_response, future, time.time()) + _f.add_callback(self._handle_join_group_response, future, time.monotonic()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future @@ -598,7 +598,7 @@ def _handle_join_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: if self._sensors: - self._sensors.join_latency.record((time.time() - send_time) * 1000) + self._sensors.join_latency.record((time.monotonic() - send_time) * 1000) with self._lock: if self.state is not MemberState.REBALANCING: # if the consumer was woken up before a rebalance completes, @@ -744,7 +744,7 @@ def _send_sync_group_request(self, request): future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_sync_group_response, future, time.time()) + _f.add_callback(self._handle_sync_group_response, future, time.monotonic()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future @@ -754,7 +754,7 @@ def _handle_sync_group_response(self, future, send_time, response): error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: if self._sensors: - self._sensors.sync_latency.record((time.time() - send_time) * 1000) + self._sensors.sync_latency.record((time.monotonic() - send_time) * 1000) future.success(response.member_assignment) return @@ -997,14 +997,14 @@ def _send_heartbeat_request(self): heartbeat_log.debug("Sending HeartbeatRequest to %s: %s", self.coordinator_id, request) future = Future() _f = self._client.send(self.coordinator_id, request) - _f.add_callback(self._handle_heartbeat_response, future, time.time()) + _f.add_callback(self._handle_heartbeat_response, future, time.monotonic()) _f.add_errback(self._failed_request, self.coordinator_id, request, future) return future def _handle_heartbeat_response(self, future, send_time, response): if self._sensors: - self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000) + self._sensors.heartbeat_latency.record((time.monotonic() - send_time) * 1000) heartbeat_log.debug("Received heartbeat response for group %s: %s", self.group_id, response) error_type = Errors.for_code(response.error_code) diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 58869d61a..25da9fe1b 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -127,7 +127,7 @@ def __init__(self, client, subscription, **configs): log.warning('group_id is None: disabling auto-commit.') self.config['enable_auto_commit'] = False else: - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval + self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval if self.config['metrics']: self._consumer_sensors = ConsumerCoordinatorMetrics( @@ -250,7 +250,7 @@ def _on_join_complete(self, generation, member_id, protocol, assignor.on_generation_assignment(generation) # reschedule the auto commit starting from now - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval + self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval assigned = set(self._subscription.assigned_partitions()) log.info("Setting newly assigned partitions %s for group %s", @@ -321,10 +321,10 @@ def time_to_next_poll(self): if not self.config['enable_auto_commit']: return self.time_to_next_heartbeat() - if time.time() > self.next_auto_commit_deadline: + if time.monotonic() > self.next_auto_commit_deadline: return 0 - return min(self.next_auto_commit_deadline - time.time(), + return min(self.next_auto_commit_deadline - time.monotonic(), self.time_to_next_heartbeat()) def _perform_assignment(self, leader_id, assignment_strategy, members): @@ -753,7 +753,7 @@ def _send_offset_commit_request(self, offsets): future = Future() _f = self._client.send(node_id, request) - _f.add_callback(self._handle_offset_commit_response, offsets, future, time.time()) + _f.add_callback(self._handle_offset_commit_response, offsets, future, time.monotonic()) _f.add_errback(self._failed_request, node_id, request, future) return future @@ -761,7 +761,7 @@ def _handle_offset_commit_response(self, offsets, future, send_time, response): log.debug("Received OffsetCommitResponse: %s", response) # TODO look at adding request_latency_ms to response (like java kafka) if self._consumer_sensors: - self._consumer_sensors.commit_latency.record((time.time() - send_time) * 1000) + self._consumer_sensors.commit_latency.record((time.monotonic() - send_time) * 1000) unauthorized_topics = set() for topic, partitions in response.topics: @@ -961,15 +961,15 @@ def _default_offset_commit_callback(self, offsets, res_or_exc): def _commit_offsets_async_on_complete(self, offsets, res_or_exc): if isinstance(res_or_exc, Exception) and getattr(res_or_exc, 'retriable', False): - self.next_auto_commit_deadline = min(time.time() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) + self.next_auto_commit_deadline = min(time.monotonic() + self.config['retry_backoff_ms'] / 1000, self.next_auto_commit_deadline) self.config['default_offset_commit_callback'](offsets, res_or_exc) def _maybe_auto_commit_offsets_async(self): if self.config['enable_auto_commit']: if self.coordinator_unknown(): - self.next_auto_commit_deadline = time.time() + self.config['retry_backoff_ms'] / 1000 - elif time.time() > self.next_auto_commit_deadline: - self.next_auto_commit_deadline = time.time() + self.auto_commit_interval + self.next_auto_commit_deadline = time.monotonic() + self.config['retry_backoff_ms'] / 1000 + elif time.monotonic() > self.next_auto_commit_deadline: + self.next_auto_commit_deadline = time.monotonic() + self.auto_commit_interval self._do_auto_commit_offsets_async() def maybe_auto_commit_offsets_now(self): diff --git a/kafka/coordinator/heartbeat.py b/kafka/coordinator/heartbeat.py index fb40c302b..4bf04d4ee 100644 --- a/kafka/coordinator/heartbeat.py +++ b/kafka/coordinator/heartbeat.py @@ -34,25 +34,25 @@ def __init__(self, **configs): self.last_send = -1 * float('inf') self.last_receive = -1 * float('inf') self.last_poll = -1 * float('inf') - self.last_reset = time.time() + self.last_reset = time.monotonic() self.heartbeat_failed = None def poll(self): - self.last_poll = time.time() + self.last_poll = time.monotonic() def sent_heartbeat(self): - self.last_send = time.time() + self.last_send = time.monotonic() self.heartbeat_failed = False def fail_heartbeat(self): self.heartbeat_failed = True def received_heartbeat(self): - self.last_receive = time.time() + self.last_receive = time.monotonic() def time_to_next_heartbeat(self): """Returns seconds (float) remaining before next heartbeat should be sent""" - time_since_last_heartbeat = time.time() - max(self.last_send, self.last_reset) + time_since_last_heartbeat = time.monotonic() - max(self.last_send, self.last_reset) if self.heartbeat_failed: delay_to_next_heartbeat = self.config['retry_backoff_ms'] / 1000 else: @@ -64,15 +64,15 @@ def should_heartbeat(self): def session_timeout_expired(self): last_recv = max(self.last_receive, self.last_reset) - return (time.time() - last_recv) > (self.config['session_timeout_ms'] / 1000) + return (time.monotonic() - last_recv) > (self.config['session_timeout_ms'] / 1000) def reset_timeouts(self): - self.last_reset = time.time() - self.last_poll = time.time() + self.last_reset = time.monotonic() + self.last_poll = time.monotonic() self.heartbeat_failed = False def poll_timeout_expired(self): - return (time.time() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000) + return (time.monotonic() - self.last_poll) > (self.config['max_poll_interval_ms'] / 1000) def __str__(self): return (" + return ((time.monotonic() * 1000 - self._last_record_time) > self._inactive_sensor_expiration_time_ms) diff --git a/kafka/producer/producer_batch.py b/kafka/producer/producer_batch.py index 71dea4fbd..85c427aa9 100644 --- a/kafka/producer/producer_batch.py +++ b/kafka/producer/producer_batch.py @@ -17,7 +17,7 @@ class FinalState(IntEnum): class ProducerBatch(object): def __init__(self, tp, records, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now self.max_record_size = 0 self.created = now self.drained = None @@ -55,7 +55,7 @@ def try_append(self, timestamp_ms, key, value, headers, now=None): if metadata is None: return None - now = time.time() if now is None else now + now = time.monotonic() if now is None else now self.max_record_size = max(self.max_record_size, metadata.size) self.last_append = now future = FutureRecordMetadata( @@ -151,14 +151,14 @@ def _complete_future(self, base_offset, timestamp_ms, record_exceptions_fn): self.produce_future.success((base_offset, timestamp_ms, record_exceptions_fn)) def has_reached_delivery_timeout(self, delivery_timeout_ms, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now return delivery_timeout_ms / 1000 <= now - self.created def in_retry(self): return self._retry def retry(self, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now self._retry = True self.attempts += 1 self.last_attempt = now diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index e779b1668..ccff983b3 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -129,7 +129,7 @@ def append(self, tp, timestamp_ms, key, value, headers, now=None): """ assert isinstance(tp, TopicPartition), 'not TopicPartition' assert not self._closed, 'RecordAccumulator is closed' - now = time.time() if now is None else now + now = time.monotonic() if now is None else now # We keep track of the number of appending thread to make sure we do # not miss batches in abortIncompleteBatches(). self._appends_in_progress.increment() @@ -250,7 +250,7 @@ def ready(self, cluster, now=None): ready_nodes = set() next_ready_check = 9999999.99 unknown_leaders_exist = False - now = time.time() if now is None else now + now = time.monotonic() if now is None else now # several threads are accessing self._batches -- to simplify # concurrent access, we iterate over a snapshot of partitions @@ -316,7 +316,7 @@ def _should_stop_drain_batches_for_partition(self, first, tp): return False def drain_batches_for_one_node(self, cluster, node_id, max_size, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now size = 0 ready = [] partitions = list(cluster.partitions_for_broker(node_id)) @@ -405,7 +405,7 @@ def drain(self, cluster, nodes, max_size, now=None): if not nodes: return {} - now = time.time() if now is None else now + now = time.monotonic() if now is None else now batches = {} for node_id in nodes: batches[node_id] = self.drain_batches_for_one_node(cluster, node_id, max_size, now=now) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 7df76ea24..1463ec8ba 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -177,7 +177,7 @@ def run_once(self): self._client.poll(timeout_ms=poll_timeout_ms) def _send_producer_data(self, now=None): - now = time.time() if now is None else now + now = time.monotonic() if now is None else now # get the list of partitions with data ready to send result = self._accumulator.ready(self._metadata, now=now) ready_nodes, next_ready_check_delay, unknown_leaders_exist = result @@ -232,7 +232,7 @@ def _send_producer_data(self, now=None): for expired_batch in expired_batches: error_message = "Expiring %d record(s) for %s: %s ms has passed since batch creation" % ( expired_batch.record_count, expired_batch.topic_partition, - int((time.time() - expired_batch.created) * 1000)) + int((time.monotonic() - expired_batch.created) * 1000)) self._fail_batch(expired_batch, PartitionResponse(error=Errors.KafkaTimeoutError, error_message=error_message)) if self._sensors: @@ -273,7 +273,7 @@ def _send_producer_data(self, now=None): log.debug('%s: Sending Produce Request: %r', str(self), request) (self._client.send(node_id, request, wakeup=False) .add_callback( - self._handle_produce_response, node_id, time.time(), batches) + self._handle_produce_response, node_id, time.monotonic(), batches) .add_errback( self._failed_produce, batches, node_id)) return poll_timeout_ms diff --git a/kafka/util.py b/kafka/util.py index 5c7dd927c..6ebcc4f95 100644 --- a/kafka/util.py +++ b/kafka/util.py @@ -12,7 +12,7 @@ class Timer: def __init__(self, timeout_ms, error_message=None, start_at=None): self._timeout_ms = timeout_ms - self._start_at = start_at or time.time() + self._start_at = start_at or time.monotonic() if timeout_ms is not None: self._expire_at = self._start_at + timeout_ms / 1000 else: @@ -21,7 +21,7 @@ def __init__(self, timeout_ms, error_message=None, start_at=None): @property def expired(self): - return time.time() >= self._expire_at + return time.monotonic() >= self._expire_at @property def timeout_ms(self): @@ -29,7 +29,7 @@ def timeout_ms(self): return None elif self._expire_at == float('inf'): return float('inf') - remaining = self._expire_at - time.time() + remaining = self._expire_at - time.monotonic() if remaining < 0: return 0 else: @@ -37,7 +37,7 @@ def timeout_ms(self): @property def elapsed_ms(self): - return int(1000 * (time.time() - self._start_at)) + return int(1000 * (time.monotonic() - self._start_at)) def maybe_raise(self): if self.expired: diff --git a/test/integration/test_consumer_group.py b/test/integration/test_consumer_group.py index c0b4857a1..2dd4f1093 100644 --- a/test/integration/test_consumer_group.py +++ b/test/integration/test_consumer_group.py @@ -73,9 +73,9 @@ def consumer_thread(i): threads[i] = t try: - timeout = time.time() + 15 + timeout = time.monotonic() + 15 while True: - assert time.time() < timeout, "timeout waiting for assignments" + assert time.monotonic() < timeout, "timeout waiting for assignments" # Verify all consumers have been created missing_consumers = set(consumers.keys()) - set(range(num_consumers)) if missing_consumers: @@ -128,7 +128,7 @@ def consumer_thread(i): while True: for c in range(num_consumers): heartbeat = consumers[c]._coordinator.heartbeat - last_hb = time.time() - 0.5 + last_hb = time.monotonic() - 0.5 if (heartbeat.heartbeat_failed or heartbeat.last_receive < last_hb or heartbeat.last_reset > last_hb): @@ -176,7 +176,7 @@ def test_heartbeat_thread(kafka_broker, topic): heartbeat_interval_ms=500) # poll until we have joined group / have assignment - start = time.time() + start = time.monotonic() while not consumer.assignment(): consumer.poll(timeout_ms=100) @@ -193,9 +193,9 @@ def test_heartbeat_thread(kafka_broker, topic): assert last_send > start assert last_recv > start - timeout = time.time() + 30 + timeout = time.monotonic() + 30 while True: - if time.time() > timeout: + if time.monotonic() > timeout: raise RuntimeError('timeout waiting for heartbeat') if consumer._coordinator.heartbeat.last_receive > last_recv: break diff --git a/test/test_client_async.py b/test/test_client_async.py index b4811d346..f37906074 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -233,8 +233,8 @@ def test_send(client_selector_mocked, conn): def test_poll(mocker, client_poll_mocked): metadata = mocker.patch.object(client_poll_mocked, '_maybe_refresh_metadata') ifr_request_timeout = mocker.patch.object(client_poll_mocked, '_next_ifr_request_timeout_ms') - now = time.time() - t = mocker.patch('time.time') + now = time.monotonic() + t = mocker.patch('time.monotonic') t.return_value = now # metadata timeout wins @@ -302,8 +302,8 @@ def test_maybe_refresh_metadata_ttl(client_poll_mocked): def test_maybe_refresh_metadata_backoff(mocker, client_poll_mocked): mocker.patch.object(client_poll_mocked, 'least_loaded_node', return_value=None) mocker.patch.object(client_poll_mocked, 'least_loaded_node_refresh_ms', return_value=4321) - now = time.time() - t = mocker.patch('time.time') + now = time.monotonic() + t = mocker.patch('time.monotonic') t.return_value = now client_poll_mocked.poll(timeout_ms=12345678) @@ -336,8 +336,8 @@ def test_maybe_refresh_metadata_cant_send(mocker, client_poll_mocked): mocker.patch.object(client_poll_mocked, '_can_connect', return_value=True) mocker.patch.object(client_poll_mocked, '_init_connect', return_value=True) - now = time.time() - t = mocker.patch('time.time') + now = time.monotonic() + t = mocker.patch('time.monotonic') t.return_value = now # first poll attempts connection @@ -363,7 +363,7 @@ def test_unschedule(): def test_idle_connection_manager(mocker): - t = mocker.patch.object(time, 'time') + t = mocker.patch.object(time, 'monotonic') t.return_value = 0 idle = IdleConnectionManager(100) diff --git a/test/test_conn.py b/test/test_conn.py index d8db22966..a29842f63 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -122,7 +122,7 @@ def test_connect_timeout(_socket, conn): def test_blacked_out(conn): - with mock.patch("time.time", return_value=1000): + with mock.patch("time.monotonic", return_value=1000): conn.last_attempt = 0 assert conn.blacked_out() is False conn.last_attempt = 1000 @@ -131,7 +131,7 @@ def test_blacked_out(conn): def test_connection_delay(conn, mocker): mocker.patch.object(conn, '_reconnect_jitter_pct', return_value=1.0) - with mock.patch("time.time", return_value=1000): + with mock.patch("time.monotonic", return_value=1000): conn.last_attempt = 1000 assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] conn.state = ConnectionStates.CONNECTING @@ -385,7 +385,7 @@ def test_relookup_on_failure(): def test_requests_timed_out(conn): - with mock.patch("time.time", return_value=0): + with mock.patch("time.monotonic", return_value=0): # No in-flight requests, not timed out assert not conn.requests_timed_out() @@ -415,7 +415,7 @@ def test_maybe_throttle(conn): conn._maybe_throttle(HeartbeatResponse[0](error_code=0)) assert not conn.throttled() - with mock.patch("time.time", return_value=1000) as time: + with mock.patch("time.monotonic", return_value=1000) as time: # server-side throttling in v1.0 conn.config['api_version'] = (1, 0) conn._maybe_throttle(HeartbeatResponse[1](throttle_time_ms=1000, error_code=0)) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 3032a4973..542a7b313 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -531,7 +531,7 @@ def test_send_offset_commit_request_success(mocker, patched_coord, offsets): def test_handle_offset_commit_response(mocker, patched_coord, offsets, response, error, dead): future = Future() - patched_coord._handle_offset_commit_response(offsets, future, time.time(), + patched_coord._handle_offset_commit_response(offsets, future, time.monotonic(), response) assert isinstance(future.exception, error) if error else True assert patched_coord.coordinator_id is (None if dead else 0) diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 71c726bc8..9e4927c9c 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -422,7 +422,7 @@ def test_fetched_records(fetcher, topic, mocker): ]) def test__handle_fetch_response(fetcher, fetch_offsets, fetch_response, num_partitions): fetcher._nodes_with_pending_fetch_requests.add(0) - fetcher._handle_fetch_response(0, fetch_offsets, time.time(), fetch_response) + fetcher._handle_fetch_response(0, fetch_offsets, time.monotonic(), fetch_response) assert len(fetcher._completed_fetches) == num_partitions diff --git a/test/test_metrics.py b/test/test_metrics.py index 07c0e838a..be909d308 100644 --- a/test/test_metrics.py +++ b/test/test_metrics.py @@ -66,7 +66,7 @@ def test_MetricName(): def test_simple_stats(mocker, time_keeper, metrics): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) config = metrics._config measurable = ConstantMeasurable() @@ -215,7 +215,7 @@ def test_remove_sensor(metrics): def test_remove_inactive_metrics(mocker, time_keeper, metrics): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) s1 = metrics.sensor('test.s1', None, 1) s1.add(metrics.metric_name('test.s1.count', 'grp1'), Count()) @@ -289,7 +289,7 @@ def test_remove_metric(metrics): def test_event_windowing(mocker, time_keeper): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) count = Count() config = MetricConfig(event_window=1, samples=2) @@ -301,7 +301,7 @@ def test_event_windowing(mocker, time_keeper): def test_time_windowing(mocker, time_keeper): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) count = Count() config = MetricConfig(time_window_ms=1, samples=2) @@ -315,7 +315,7 @@ def test_time_windowing(mocker, time_keeper): def test_old_data_has_no_effect(mocker, time_keeper): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) max_stat = Max() min_stat = Min() @@ -401,7 +401,7 @@ def test_Percentiles(metrics): assert p75.value() < 1.0 def test_rate_windowing(mocker, time_keeper, metrics): - mocker.patch('time.time', side_effect=time_keeper.time) + mocker.patch('time.monotonic', side_effect=time_keeper.time) # Use the default time window. Set 3 samples config = MetricConfig(samples=3) @@ -425,7 +425,7 @@ def test_rate_windowing(mocker, time_keeper, metrics): kafka_metric = metrics.metrics.get(metrics.metric_name('test.rate', 'grp1')) assert abs((sum_val / elapsed_secs) - kafka_metric.value()) < EPS, \ 'Rate(0...2) = 2.666' - assert abs(elapsed_secs - (kafka_metric.measurable.window_size(config, time.time() * 1000) / 1000.0)) \ + assert abs(elapsed_secs - (kafka_metric.measurable.window_size(config, time.monotonic() * 1000) / 1000.0)) \ < EPS, 'Elapsed Time = 75 seconds' @@ -469,7 +469,7 @@ class TimeKeeper(object): A clock that you can manually advance by calling sleep """ def __init__(self, auto_tick_ms=0): - self._millis = time.time() * 1000 + self._millis = time.monotonic() * 1000 self._auto_tick_ms = auto_tick_ms def time(self): diff --git a/test/test_sender.py b/test/test_sender.py index 72b8f9a56..4f44112e3 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -125,7 +125,7 @@ def test_complete_batch_transaction(sender, transaction_manager): (Errors.TransactionalIdAuthorizationFailedError, False), ]) def test_complete_batch_error(sender, error, refresh_metadata): - sender._client.cluster._last_successful_refresh_ms = (time.time() - 10) * 1000 + sender._client.cluster._last_successful_refresh_ms = (time.monotonic() - 10) * 1000 sender._client.cluster._need_update = False sender.config['retries'] = 0 assert sender._client.cluster.ttl() > 0 @@ -243,7 +243,7 @@ def test_run_once(): def test__send_producer_data_expiry_time_reset(sender, accumulator, mocker): - now = time.time() + now = time.monotonic() tp = TopicPartition('foo', 0) mocker.patch.object(sender, '_failed_produce') result = accumulator.append(tp, 0, b'key', b'value', [], now=now) diff --git a/test/testutil.py b/test/testutil.py index 1e1f8e198..48459c2c8 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -52,9 +52,9 @@ def maybe_skip_unsupported_compression(compression_type): class Timer(object): def __enter__(self): - self.start = time.time() + self.start = time.monotonic() return self def __exit__(self, *args): - self.end = time.time() + self.end = time.monotonic() self.interval = self.end - self.start