Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 54 additions & 21 deletions imdclient/IMDClient.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class IMDClient:
buffer_size : int (optional)
:class:`IMDFrameBuffer` will be filled with as many :class:`IMDFrame` fit in `buffer_size` bytes [``10MB``]
timeout : int, optional
Timeout for the socket in seconds [``5``]
Timeout for the socket in seconds [``600``]
continue_after_disconnect : bool, optional [``None``]
If True, the client will attempt to change the simulation engine's waiting behavior to
non-blocking after the client disconnects. If False, the client will attempt to change it
Expand All @@ -73,6 +73,14 @@ def __init__(
continue_after_disconnect=None,
**kwargs,
):

# Warn if timeout is overly optimistic
if "timeout" in kwargs and kwargs["timeout"] <= 1:
logger.warning(
f"IMDClient: timeout value of {kwargs['timeout']} second(s) is very low and may lead to "
"premature disconnection by the client. Consider using a higher value (default is 600 seconds)."
)

self._stopped = False
self._conn = self._connect_to_server(host, port, socket_bufsize)
self._imdsinfo = self._await_IMD_handshake()
Expand Down Expand Up @@ -169,21 +177,26 @@ def get_imdframe(self):
if self._multithreaded:
try:
return self._buf.pop_full_imdframe()
except EOFError:
except EOFError as e:
# in this case, consumer is already finished
# and doesn't need to be notified
logger.debug(f"IMDClient: Multithreaded connection ended")
self._disconnect()
self._stopped = True

if self._error_queue.qsize():
raise EOFError(f"{self._error_queue.get()}")
raise EOFError
try:
error = self._error_queue.get_nowait()
except queue.Empty:
raise EOFError from e
else:
raise EOFError(str(error)) from error
else:
try:
return self._producer._get_imdframe()
except EOFError:
except EOFError as e:
logger.debug(f"IMDClient: Single-threaded connection ended")
self._disconnect()
raise EOFError
raise EOFError from e
Comment on lines 194 to +199
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This block is never tested. Do we not test the single-threaded client?

I think we should do test it at least with a simple test. Otherwise we're advertising code for which we don't even know if it runs correctly.


def get_imdsessioninfo(self):
"""
Expand Down Expand Up @@ -241,7 +254,9 @@ def _await_IMD_handshake(self) -> IMDSessionInfo:
read_into_buf(self._conn, h_buf)
except (ConnectionError, TimeoutError, Exception) as e:
logger.debug("IMDClient: No handshake packet received: %s", e)
raise ConnectionError("IMDClient: No handshake packet received")
raise ConnectionError(
"IMDClient: No handshake packet received"
) from e

header = IMDHeader(h_buf)

Expand Down Expand Up @@ -362,7 +377,7 @@ class BaseIMDProducer(threading.Thread):
error_queue: queue.Queue
Queue to hold errors produced by the producer thread
timeout : int, optional
Timeout for the socket in seconds [``5``]
Timeout for the socket in seconds [``600``]
"""

def __init__(
Expand All @@ -373,7 +388,7 @@ def __init__(
n_atoms,
multithreaded,
error_queue,
timeout=5,
timeout=600,
**kwargs,
):
super(BaseIMDProducer, self).__init__(daemon=True)
Expand Down Expand Up @@ -424,7 +439,8 @@ def _get_imdframe(self):
try:
self._parse_imdframe()
except EOFError as e:
raise EOFError
logger.debug(f"IMDProducer: No more frames to read: {e}")
raise EOFError from e
except Exception as e:
raise RuntimeError("An unexpected error occurred") from e
Comment on lines 439 to 445
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of these lines are covered.


Expand Down Expand Up @@ -468,11 +484,13 @@ def run(self):
self._frame,
t.elapsed,
)
except EOFError:
except EOFError as e:
# simulation ended in a way
# that we expected
# i.e. consumer stopped or read_into_buf didn't find
# full token of data
logger.debug("IMDProducer: %s", e)
self.error_queue.put(e)
logger.debug("IMDProducer: Simulation ended normally, cleaning up")
except Exception as e:
logger.debug("IMDProducer: An unexpected error occurred: %s", e)
Expand Down Expand Up @@ -513,13 +531,19 @@ def _read(self, buf):
"""Wraps `read_into_buf` call to give uniform error handling which indicates end of stream"""
try:
read_into_buf(self._conn, buf)
except (ConnectionError, TimeoutError, BlockingIOError, Exception):
# ConnectionError: Server is definitely done sending frames, socket is closed
# TimeoutError: Server is *likely* done sending frames.
# BlockingIOError: Occurs when timeout is 0 in place of a TimeoutError. Server is *likely* done sending frames
# OSError: Occurs when main thread disconnects from the server and closes the socket, but producer thread attempts to read another frame
# Exception: Something unexpected happened
raise EOFError
except ConnectionError as e:
raise EOFError("Server is definitely done sending frames") from e
except TimeoutError as e:
raise EOFError("Server is likely done sending frames") from e
except BlockingIOError as e:
raise EOFError("Server is likely done sending frames") from e
except Exception as e:
raise EOFError("Something unexpected happened") from e


class IMDProducerV2(BaseIMDProducer):
Expand Down Expand Up @@ -597,7 +621,9 @@ def _pause(self):
self._conn.sendall(pause)
except ConnectionResetError as e:
# Simulation has already ended by the time we paused
raise EOFError
raise EOFError(
"Simulation has already ended by the time we paused"
) from e
# Edge case: pause occured in the time between server sends its last frame
# and closing socket
# Simulation is not actually paused but is over, but we still want to read remaining data
Expand All @@ -612,7 +638,9 @@ def _unpause(self):
# Edge case: pause occured in the time between server sends its last frame
# and closing socket
# Simulation was never actually paused in this case and is now over
raise EOFError
raise EOFError(
"Simulation was never actually paused as pause was sent after the last frame; simulation is now over"
) from e
# Edge case: pause & unpause occured in the time between server sends its last frame and closing socket
# in this case, the simulation isn't actually unpaused but over

Expand Down Expand Up @@ -662,7 +690,9 @@ def _pause(self):
self._conn.sendall(pause)
except ConnectionResetError as e:
# Simulation has already ended by the time we paused
raise EOFError
raise EOFError(
"Simulation has already ended by the time we paused"
) from e
# Edge case: pause occured in the time between server sends its last frame
# and closing socket
# Simulation is not actually paused but is over, but we still want to read remaining data
Expand All @@ -677,7 +707,9 @@ def _unpause(self):
# Edge case: pause occured in the time between server sends its last frame
# and closing socket
# Simulation was never actually paused in this case and is now over
raise EOFError
raise EOFError(
"Simulation was never actually paused as pause was sent after the last frame; simulation is now over"
) from e
# Edge case: pause & unpause occured in the time between server sends its last frame and closing socket
# in this case, the simulation isn't actually unpaused but over

Expand Down Expand Up @@ -852,9 +884,10 @@ def wait_for_space(self):

if self._consumer_finished:
logger.debug("IMDProducer: Noticing consumer finished")
raise EOFError
raise EOFError("Consumer has finished")
except Exception as e:
logger.debug(f"IMDProducer: Error waiting for space in buffer: {e}")
raise RuntimeError("Error waiting for space in buffer") from e

def pop_empty_imdframe(self):
logger.debug("IMDProducer: Getting empty frame")
Expand All @@ -870,7 +903,7 @@ def pop_empty_imdframe(self):

if self._consumer_finished:
logger.debug("IMDProducer: Noticing consumer finished")
raise EOFError
raise EOFError("Consumer has finished")

return self._empty_q.get()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This return is never tested according to coverage. Do you know why and does it matter?


Expand Down Expand Up @@ -905,7 +938,7 @@ def pop_full_imdframe(self):

if self._producer_finished and self._full_q.qsize() == 0:
logger.debug("IMDFrameBuffer(Consumer): Producer finished")
raise EOFError
raise EOFError("Producer has finished")

imdf = self._full_q.get()

Expand Down
6 changes: 4 additions & 2 deletions imdclient/tests/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def set_imdsessioninfo(self, imdsinfo):
@property
def port(self):
"""Get the port the server is bound to.

Returns:
int: The port number, or None if not bound yet.
"""
Expand All @@ -47,7 +47,9 @@ def handshake_sequence(self, host, first_frame=True):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind((host, 0)) # Bind to port 0 to get a free port
self._bound_port = s.getsockname()[1] # Store the actual bound port
logger.debug(f"InThreadIMDServer: Listening on {host}:{self._bound_port}")
logger.debug(
f"InThreadIMDServer: Listening on {host}:{self._bound_port}"
)
Comment on lines +50 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did black want to reformat this??

s.listen(60)
self.listen_socket = s

Expand Down
Loading
Loading