Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 32 additions & 35 deletions concore.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,8 @@
import numpy as np
import signal

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
force=True
)
logger = logging.getLogger('concore')
logger.addHandler(logging.NullHandler())

#these lines mute the noisy library
logging.getLogger('matplotlib').setLevel(logging.WARNING)
Expand Down Expand Up @@ -53,10 +50,10 @@ def __init__(self, port_type, address, zmq_socket_type):
# Bind or connect
if self.port_type == "bind":
self.socket.bind(address)
logging.info(f"ZMQ Port bound to {address}")
logger.info(f"ZMQ Port bound to {address}")
else:
self.socket.connect(address)
logging.info(f"ZMQ Port connected to {address}")
logger.info(f"ZMQ Port connected to {address}")

def send_json_with_retry(self, message):
"""Send JSON message with retries if timeout occurs."""
Expand All @@ -65,9 +62,9 @@ def send_json_with_retry(self, message):
self.socket.send_json(message)
return
except zmq.Again:
logging.warning(f"Send timeout (attempt {attempt + 1}/5)")
logger.warning(f"Send timeout (attempt {attempt + 1}/5)")
time.sleep(0.5)
logging.error("Failed to send after retries.")
logger.error("Failed to send after retries.")
return

def recv_json_with_retry(self):
Expand All @@ -76,9 +73,9 @@ def recv_json_with_retry(self):
try:
return self.socket.recv_json()
except zmq.Again:
logging.warning(f"Receive timeout (attempt {attempt + 1}/5)")
logger.warning(f"Receive timeout (attempt {attempt + 1}/5)")
time.sleep(0.5)
logging.error("Failed to receive after retries.")
logger.error("Failed to receive after retries.")
return None

# Global ZeroMQ ports registry
Expand All @@ -94,20 +91,20 @@ def init_zmq_port(port_name, port_type, address, socket_type_str):
socket_type_str (str): String representation of ZMQ socket type (e.g., "REQ", "REP", "PUB", "SUB").
"""
if port_name in zmq_ports:
logging.info(f"ZMQ Port {port_name} already initialized.")
logger.info(f"ZMQ Port {port_name} already initialized.")
return # Avoid reinitialization

try:
# Map socket type string to actual ZMQ constant (e.g., zmq.REQ, zmq.REP)
zmq_socket_type = getattr(zmq, socket_type_str.upper())
zmq_ports[port_name] = ZeroMQPort(port_type, address, zmq_socket_type)
logging.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}")
logger.info(f"Initialized ZMQ port: {port_name} ({socket_type_str}) on {address}")
except AttributeError:
logging.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.")
logger.error(f"Error: Invalid ZMQ socket type string '{socket_type_str}'.")
except zmq.error.ZMQError as e:
logging.error(f"Error initializing ZMQ port {port_name} on {address}: {e}")
logger.error(f"Error initializing ZMQ port {port_name} on {address}: {e}")
except Exception as e:
logging.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")
logger.error(f"An unexpected error occurred during ZMQ port initialization for {port_name}: {e}")

def terminate_zmq():
"""Clean up all ZMQ sockets and contexts before exit."""
Expand All @@ -127,7 +124,7 @@ def terminate_zmq():
port.context.term()
print(f"Closed ZMQ port: {port_name}")
except Exception as e:
logging.error(f"Error while terminating ZMQ port {port.address}: {e}")
logger.error(f"Error while terminating ZMQ port {port.address}: {e}")
zmq_ports.clear()
_cleanup_in_progress = False

Expand Down Expand Up @@ -242,9 +239,9 @@ def parse_params(sparams: str) -> dict:
sparams = sparams[1:-1]

# Parse params using clean function instead of regex
logging.debug("parsing sparams: "+sparams)
logger.debug("parsing sparams: "+sparams)
params = parse_params(sparams)
logging.debug("parsed params: " + str(params))
logger.debug("parsed params: " + str(params))
else:
params = dict()
else:
Expand Down Expand Up @@ -306,17 +303,17 @@ def read(port_identifier, name, initstr_val):
return message[1:]
return message
except zmq.error.ZMQError as e:
logging.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
logger.error(f"ZMQ read error on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val
except Exception as e:
logging.error(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.")
logger.error(f"Unexpected error during ZMQ read on port {port_identifier} (name: {name}): {e}. Returning default.")
return default_return_val

# Case 2: File-based port
try:
file_port_num = int(port_identifier)
except ValueError:
logging.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
logger.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
return default_return_val

time.sleep(delay)
Expand All @@ -330,7 +327,7 @@ def read(port_identifier, name, initstr_val):
ins = str(initstr_val)
s += ins # Update s to break unchanged() loop
except Exception as e:
logging.error(f"Error reading {file_path}: {e}. Using default value.")
logger.error(f"Error reading {file_path}: {e}. Using default value.")
return default_return_val

# Retry logic if file is empty
Expand All @@ -342,12 +339,12 @@ def read(port_identifier, name, initstr_val):
with open(file_path, "r") as infile:
ins = infile.read()
except Exception as e:
logging.warning(f"Retry {attempts + 1}: Error reading {file_path} - {e}")
logger.warning(f"Retry {attempts + 1}: Error reading {file_path} - {e}")
attempts += 1
retrycount += 1

if len(ins) == 0:
logging.error(f"Max retries reached for {file_path}, using default value.")
logger.error(f"Max retries reached for {file_path}, using default value.")
return default_return_val

s += ins
Expand All @@ -361,10 +358,10 @@ def read(port_identifier, name, initstr_val):
simtime = max(simtime, current_simtime_from_file)
return inval[1:]
else:
logging.warning(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.")
logger.warning(f"Warning: Unexpected data format in {file_path}: {ins}. Returning raw content or default.")
return inval
except Exception as e:
logging.error(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.")
logger.error(f"Error parsing content from {file_path} ('{ins}'): {e}. Returning default.")
return default_return_val


Expand All @@ -389,24 +386,24 @@ def write(port_identifier, name, val, delta=0):
else:
zmq_p.send_json_with_retry(zmq_val)
except zmq.error.ZMQError as e:
logging.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
logger.error(f"ZMQ write error on port {port_identifier} (name: {name}): {e}")
except Exception as e:
logging.error(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")
logger.error(f"Unexpected error during ZMQ write on port {port_identifier} (name: {name}): {e}")
return

# Case 2: File-based port
try:
file_port_num = int(port_identifier)
file_path = os.path.join(outpath + str(file_port_num), name)
except ValueError:
logging.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
logger.error(f"Error: Invalid port identifier '{port_identifier}' for file operation. Must be integer or ZMQ name.")
return

# File writing rules
if isinstance(val, str):
time.sleep(2 * delay) # string writes wait longer
elif not isinstance(val, list):
logging.error(f"File write to {file_path} must have list or str value, got {type(val)}")
logger.error(f"File write to {file_path} must have list or str value, got {type(val)}")
return

try:
Expand All @@ -420,7 +417,7 @@ def write(port_identifier, name, val, delta=0):
else:
outfile.write(val)
except Exception as e:
logging.error(f"Error writing to {file_path}: {e}")
logger.error(f"Error writing to {file_path}: {e}")

def initval(simtime_val_str):
"""
Expand All @@ -436,12 +433,12 @@ def initval(simtime_val_str):
simtime = first_element
return val[1:]
else:
logging.error(f"Error: First element in initval string '{simtime_val_str}' is not a number. Using data part as is or empty.")
logger.error(f"Error: First element in initval string '{simtime_val_str}' is not a number. Using data part as is or empty.")
return val[1:] if len(val) > 1 else []
else:
logging.error(f"Error: initval string '{simtime_val_str}' is not a list or is empty. Returning empty list.")
logger.error(f"Error: initval string '{simtime_val_str}' is not a list or is empty. Returning empty list.")
return []

except Exception as e:
logging.error(f"Error parsing simtime_val_str '{simtime_val_str}': {e}. Returning empty list.")
logger.error(f"Error parsing simtime_val_str '{simtime_val_str}': {e}. Returning empty list.")
return []