Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions daemons/hsfei/atcfwheel
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#!/usr/bin/python3.12
'''Module for the ATC Filter Wheel Daemon'''
import logging
import configparser
from libby.daemon import LibbyDaemon # pyright: ignore[reportMissingImports]
from hispec.util.thorlabs.fw102c import FilterWheelController #pylint: disable = E0401,E0611
#from fw102c import FilterWheelController # Assuming fw102c.py is in the same directory

class Atcfwheel(LibbyDaemon): #pylint: disable = W0223
'''Daemon for controlling the ATC Filter Wheel via Thorlabs FW102C controller'''
peer_id = "atcfwheel"
group_id = "hsfei"

# RabbitMQ on hispec
transport = "rabbitmq"
discovery_enabled = False
discovery_interval_s = 5.0
rabbitmq_url = "amqp://localhost"
daemon_desc = "ATC FWheel"
named_positions = {}

# pub/sub topics
topics = {}

def __init__(self):
"""Initialize the pickoff daemon.

Args: come from the hsfei configuration file
"""
#on start set up the daemon from config and initialize device
config = configparser.ConfigParser()
config.read('hsfei.config')
self.host = config["atcfwheel"]["host"]
self.port = int(config["atcfwheel"]["port"])

self.dev = FilterWheelController()

# Set up logging (temporary UNTIL LIBBY LOGGING IS IMPLEMENTED)
self.logger = logging.getLogger(__name__)
self.logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
self.logger.addHandler(ch)


# Daemon state
self.state = {
'connected': False,
'error': ''
}

# Call parent __init__ first
super().__init__()

def on_start(self, libby):
'''Starts up daemon and initializies the hardware device'''
self.logger.info("Starting %s Daemon", self.daemon_desc)
self.add_services({
"connect": lambda p: self.connect(),
"disconnect": lambda p: self.disconnect(),
"initialize": lambda p: self.initialize(),
"status": lambda p: self.status(),
"position.get": lambda p: self.get_pos(),
"position.set": lambda p: self.set_pos(pos = p.get("position")),
"position.get_named": lambda p: self.get_named_position(),
"position.set_named": lambda p: self.goto_named_pos(name = p.get("named_pos"))
})
try:
self.connect()
self.logger.info("Connected to %s", self.daemon_desc)
self.initialize()
self.logger.info("Initialized %s", self.daemon_desc)
self.load_named_pos()
#publish to libby
libby.publish("atcfwheel", {"Daemon Startup": "Success"})
except Exception as e: # pylint: disable=W0718
self.logger.error("Error Connecting and Initializing %s: %s", self.daemon_desc, e)
#publish failure
libby.publish("atcfwheel", {"Daemon Startup": "Failed", "Connection Error": f"{e}"})

def on_stop(self, libby) -> None: #pylint: disable=W0222
'''Stops the daemon and disconnects from hardware device'''
try:
self.disconnect()
self.logger.info("Disconnected %s", self.daemon_desc)
libby.publish("atcfwheel", {"Daemon Shutdown": "Success"})
except Exception as e: # pylint: disable=W0718
libby.publish("atcfwheel", {"Daemon Startup": "Failed", "Error":f"{e}"})
self.logger.error("Disconnect %s:: Failed ", self.daemon_desc)


def connect(self):
"""handles connection"""
try:
self.dev.connect(host = self.host, port = self.port)
self.logger.info("Connected %s", self.daemon_desc)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
return {"Connect": "Failed", "Error": f"{e}"}
return {"Connect": "Success"}

def disconnect(self):
"""handles disconnection"""
try:
self.dev.disconnect()
self.logger.info("Disconnected from %s", self.daemon_desc)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
return {"Disconnect": "Failed", "Error": f"{e}"}
return {"Disconnect": "Success"}

def initialize(self):
"""handles initialization"""
# for PPC102_Coms, this involves setting the enabled status
try:
self.dev.initialize()
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
return {"Initialize": "Failed", "Error": f"{e}"}
return {"Initialize": "Success"}

def load_named_pos(self):
"""loads named positions into the device from config file"""
config = configparser.ConfigParser()
config.read('hsfei.config')
try:
for name, pos in config["atcfwheel-named_pos"].items():
self.named_positions[name] = int(pos)
self.logger.info("Loaded named positions for %s", self.daemon_desc)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
return {"Load Named Positions": "Failed", "Error": f"{e}"}
return {"Load Named Positions": "Success"}

def status(self):
"""handles status"""
try:
status = self.dev.get_status()
self.logger.debug("status: %s",status)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
return {"status": "Failed", "Error": f"{e}"}
return {"status": status}

def get_pos(self):
'''gets current position'''
try:
position = self.dev.get_pos()
self.logger.debug("get_pos: %s",position)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
return {"get_pos": "Failed", "Error": f"{e}"}
return {"position": str(position)}

def set_pos(self, pos):
'''sets current position'''
try:
pos = int(pos)
self.dev.set_pos(pos)
self.logger.debug("set_pos: %d",pos)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
return {"Move": "Failed", "Error": f"{e}"}
return {"Move": "Success"}

def goto_named_pos(self, name):
'''moves to named position'''
try:
goal = self.named_positions.get(name.lower())
if goal is not None:
self.dev.set_pos(int(goal))
self.logger.debug("goto_named_pos: %s -> %s",name,goal)
except Exception as e: # pylint: disable=W0718
self.logger.error("Error: %s",e)
return {"Move to Named Position": "Failed", "Error": f"{e}"}
return {"move": "Success"}

def get_named_position(self):
'''returns current named position'''
current = self.dev.get_pos()
for name, pos in self.named_positions.items():
if pos == current:
return {"named_pos":f"{name}"}
return {"named_pos": "Unknown"}

if __name__ == "__main__":
Atcfwheel().serve()
17 changes: 17 additions & 0 deletions daemons/hsfei/hsfei.config
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Config file for daemon references across FEI daemons

[Device Control]
atcpress_host = feiinficon
atcpress_port = 8000

[atcfwheel]
host = 192.168.29.100
port = 10010

[atcfwheel-named_pos]
clear = 1
nd2 = 2
nd3 = 3
nd4 = 4
nd5 = 5
nd6 = 6
Loading