Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 199 additions & 20 deletions src/cditools/motors.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,46 @@
from __future__ import annotations

from pathlib import Path
from typing import ClassVar

import numpy as np
from ophyd import Component as Cpt # type: ignore[import-not-found]
from ophyd import Device, EpicsMotor, PseudoPositioner, PseudoSingle
from ophyd import (
Device,
EpicsMotor,
PseudoPositioner,
PseudoSingle,
Signal,
)
from ophyd import DynamicDeviceComponent as DDC
from ophyd.pseudopos import (
pseudo_position_argument,
real_position_argument,
)
from scipy.interpolate import make_interp_spline


class EpicsMotorRO(EpicsMotor):
def __init__(self, *args, **kwargs):
def __init__(self, *args: object, **kwargs: object):
super().__init__(*args, **kwargs)

def move(self, *args, **kwargs): # noqa: ARG002
def move(self, *args: object, **kwargs: object): # noqa: ARG002
msg = f"{self.name} is read-only and cannot be moved."
raise PermissionError(msg)

def stop(self, *args, **kwargs): # noqa: ARG002
def stop(self, *args: object, **kwargs: object): # noqa: ARG002
msg = f"{self.name} is read-only and cannot be stopped manually."
raise PermissionError(msg)

def set(self, *args, **kwargs): # noqa: ARG002
def set(self, *args: object, **kwargs: object): # noqa: ARG002
msg = f"{self.name} is read-only and cannot be set."
raise PermissionError(msg)

def set_position(self, *args, **kwargs): # noqa: ARG002
def set_position(self, *args: object, **kwargs: object): # noqa: ARG002
msg = f"{self.name} is read-only and its position cannot be set."
raise PermissionError(msg)

def _readonly_put(self, *args, **kwargs): # noqa: ARG002
def _readonly_put(self, *args: object, **kwargs: object): # noqa: ARG002
msg = f"{self.name} is read-only and cannot write PVs."
raise PermissionError(msg)

Expand Down Expand Up @@ -77,7 +85,7 @@ class DMM(Device):

class DCMBase(Device):
pitch = Cpt(EpicsMotor, "Mono:HDCM-Ax:Pitch}Mtr")
fine: ClassVar[dict] = {
fine: ClassVar[dict[str, Cpt]] = {
"fpitch": Cpt(EpicsMotor, "Mono:HDCM-Ax:FP}Mtr"),
"roll": Cpt(EpicsMotor, "Mono:HDCM-Ax:Roll}Mtr"),
}
Expand All @@ -91,56 +99,227 @@ class Energy(PseudoPositioner):
# Synthetic Axis
energy = Cpt(PseudoSingle, egu="KeV")

egu = Cpt(Signal, None, add_prefix=(), value="keV", kind="config")
motor_egu = Cpt(Signal, None, add_prefix=(), value="eV", kind="config")

### not sure what PV should be used for the insertion device, example from SRX
# u_gap = Cpt(InsertionDevice, "SR:C5-ID:G1{IVU21:1")
_u_gap_offset = 0
### same for c2_x
# c2_x = Cpt(EpicsMotor, "XF:05IDA-OP:1{Mono:HDCM-Ax:X2}Mtr", add_prefix=(), read_attrs=["user_readback"])
# epics_d_spacing = EpicsSignal("XF:05IDA-CT{IOC:Status01}DCMDspacing.VAL")
# epics_bragg_offset = EpicsSignal("XF:05IDA-CT{IOC:Status01}BraggOffset.VAL")

# Energy "limits"
_low = 5.0 # TODO: CHECK THIS VALUE
_high = 15.0 # TODO: CHECK THIS VALUE
_low = 5.0 # TODO: CHECK THIS VALUE, SRX uses 4.4
_high = 15.0 # TODO: CHECK THIS VALUE, SRX uses 25

# Set up constants
Xoffset = 20.0 # mm
d_111 = 3.1286911960950756
ANG_OVER_KEV = 12.3984

def __init__(self, *args, **kwargs):
# Motor enable flags
move_u_gap = Cpt(Signal, None, add_prefix=(), value=True)
move_c2_x = Cpt(Signal, None, add_prefix=(), value=True)
harmonic = Cpt(Signal, None, add_prefix=(), value=0, kind="config")
selected_harmonic = Cpt(Signal, None, add_prefix=(), value=0)

# Experimental
detune = Cpt(Signal, None, add_prefix=(), value=0)

def __init__(
self,
*args: object,
delta_bragg: int = 0,
xoffset: int = 0,
C2Xcal: int = 0,
T2cal: int = 0,
d_111: int = 0,
**kwargs: object,
):
super().__init__(*args, **kwargs)
self._delta_bragg = delta_bragg
self._xoffset = xoffset
self._C2Xcal = C2Xcal
self._T2cal = T2cal
self._d_111 = d_111
self.energy.readback.name = "energy"
self.energy.setpoint.name = "energy_setpoint"

def energy_to_positions(self, target_energy: float):
calib_path = Path(__file__).parent
# this is temporary, we need to figure out if there is a calib file for CDI
calib_file = "../data/CDIUgapCalibration.txt"

with Path.open(calib_path / calib_file) as f:
next(f)
uposlistIn = []
elistIn = []
for line in f:
num = [float(x) for x in line.split()]
# Check in case there is an extra line at the end of the calibration file
if len(num) == 2:
uposlistIn.append(num[0])
elistIn.append(num[1])

self.etoulookup = make_interp_spline(elistIn, uposlistIn)
self.utoelookup = make_interp_spline(uposlistIn, elistIn)

# can't do this until we know the insertion device
# self.u_gap.gap.user_reacback.name = self.u_gap.name

def energy_to_positions(
self,
target_energy: float,
undulator_harmonic: int = 0,
u_detune: float = 0.0,
):
"""Compute undulator and mono positions given a target energy

Parameters
----------
target_energy : float
Target energy in keV
undulator_harmonic : int
The harmonic in the undulator to use
u_detune : float
Amount to 'mistune' the undulator in keV

Returns
-------
bragg : float
The angle to set the monocromotor in radians
gap : float
The gap position in millimeters
C2X : float
The C2X position in millimeters
ugap : float
The undulator gap position in microns
"""

# Calculate Bragg RBV
bragg = np.arcsin((self.ANG_OVER_KEV / target_energy) / (2 * self.d_111))
bragg_RBV = (
np.arcsin((self.ANG_OVER_KEV / target_energy) / (2 * self.d_111))
- self._delta_bragg
)
bragg = bragg_RBV + self._delta_bragg
T2 = self._xoffset + np.sin(bragg * np.pi / 180) / np.sin(
2 * bragg * np.pi / 180
)
dT2 = T2 - self._T2cal
C2X = self._C2Xcal - dT2

# Calculate C2X
gap = self.Xoffset / 2 / np.cos(bragg)
gap = self._xoffset / 2 / np.cos(bragg)
ugap = float(self.etoulookup((target_energy + u_detune) / undulator_harmonic))
ugap *= 1000
ugap = ugap + self._u_gap_offset

return bragg, gap
return bragg, gap, C2X, ugap

# def undulator_energy(self, harmonic: int = 3):
# ugap = self.u_gap.gap.user_readback.get() / 1000

# utoelookup = self.utoelookup
# cannot do thids until we know ugap for insertion device
# fundamental = float(utoelookup(ugap))
# energy = fundamental * harmonic

@pseudo_position_argument
def forward(self, p_pos):
def forward(self, p_pos: object):
energy = p_pos.energy # energy assumed in keV
bragg, gap = self.energy_to_positions(energy)
return self.RealPosition(bragg=np.rad2deg(bragg), cgap=gap)
bragg, gap, _, _ = self.energy_to_positions(energy)
harmonic = self.harmonic.get()
if harmonic < 0 or ((harmonic % 2) == 0 and harmonic != 0):
msg = f"The harmonic must be 0 or odd and positive, you set {harmonic}. Set `energy.harmonic` to a positive odd integer or 0."
raise RuntimeError(msg)
detune = self.detune.get()
if energy <= self._low:
msg = f"The energy you entered is too low ({energy} keV). "
msg += f"Minimum energy = {self._low:.1f} keV"
raise ValueError(msg)
if energy > self._high:
if (energy < self._low * 1000) or (energy > self._high * 1000):
# Energy is invalid
msg = f"The requested photon energy is invalid ({energy} keV). "
msg += f"Values must be in the range of {self._low:.1f} - {self._high:.1f} keV"
raise ValueError(msg)
# Energy is in eV
energy = energy / 1000.0

if harmonic < 3:
harmonic = 3
# Choose the right harmonic
braggcal, c2xcal, ugapcal = self.energy_to_positions(
energy, harmonic, detune
)
# Try higher harmonics until the required gap is too small
while True:
braggcal, c2xcal, ugapcal = self.energy_to_positions(
energy, harmonic + 2, detune
)
if ugapcal < self.u_gap.low_limit:
break
harmonic += 2

self.selected_harmonic.put(harmonic)

# Compute where we would move everything to in a perfect world
bragg, c2_x, u_gap = self.energy_to_positions(energy, harmonic, detune)

# Sometimes move the crystal gap
if not self.move_c2_x.get():
c2_x = self.c2_x.position

# Sometimes move the undulator
if not self.move_u_gap.get():
u_gap = self.u_gap.position

return self.RealPosition(bragg=np.rad2deg(bragg), c2_x=c2_x, cgap=u_gap)

@real_position_argument
def inverse(self, r_pos):
def inverse(self, r_pos: object):
bragg = np.deg2rad(r_pos.bragg)
e = self.ANG_OVER_KEV / (2 * self.d_111 * np.sin(bragg))
return self.PseudoPosition(energy=float(e))

@pseudo_position_argument
def set(self, position: list[int | float]):
return super().set([float(_) for _ in position])

def sync_with_epics(self):
self.epics_d_spacing.put(self._d_111)
self.epics_bragg_offset.put(self._delta_bragg)

def retune_undulator(self):
self.detune.put(0.0)
self.move(self.engergy.get()[0])

# def mono_peakup(element, acquisition_time=1.0, peakup=True):
# """
# First draft of the mono peakup scan
# Need more info about the axis to be scanned, the move ID, and which detector will be used for feedback.
# Args:
# element (string): element name
# acquisition_time (float, optional): _description_. Defaults to 1.0.
# peakup (bool, optional): _description_. Defaults to True.
# """
# getemissionE(element)
# energy_x = getbindingsE(element)

# yield from mov(energy, energy_x)
# setroi(1, element)
# if peakup:
# yield from bps.sleep(5)
# yield from peakup()
# yield from xanes_plan(
# erange=[energy_x - 100, energy_x + 50],
# estep=[1.0],
# samplename=f"{element}Foil",
# filename=f"{element}Foilstd",
# acqtime=acquisition_time,
# shutter=True,
# )


class VPM(Device):
fs = DDC(
Expand Down
Loading