diff --git a/pyproject.toml b/pyproject.toml
index 15e329c..157698d 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -27,6 +27,7 @@ dynamic = ["version"]
aps_mic = [
"bluesky",
"mic-instrument@git+https://github.com/BCDA-APS/bluesky-mic.git",
+ "mic-vis@git+https://github.com/grace227/mic-vis",
"h5py",
]
asksage = [
diff --git a/src/eaa/maths.py b/src/eaa/maths.py
index 764b603..031e9d9 100644
--- a/src/eaa/maths.py
+++ b/src/eaa/maths.py
@@ -47,11 +47,24 @@ def fit_gaussian_1d(
Returns
-------
tuple[float, float, float]
- The amplitude, mean, and standard deviation of the Gaussian.
+ The amplitude, mean, standard deviation, and constant offset of the Gaussian.
"""
y_max, y_min = np.max(y), np.min(y)
x_max = x[np.argmax(y)]
+ offset = x_max
+ x = x - offset
+ x_max = 0
mask = y >= y_min + y_threshold * (y_max - y_min)
- p0 = [y_max - y_min, x_max, np.count_nonzero(mask) / (x[-1] - x[0]) / 2, y_min]
+ a_guess = y_max - y_min
+ mu_guess = x_max
+ x_above_thresh = x[y > y_min + a_guess * 0.2]
+ if len(x_above_thresh) >= 3:
+ sigma_guess = (x_above_thresh.max() - x_above_thresh.min()) / 2
+ else:
+ sigma_guess = (x.max() - x.min()) / 2
+ c_guess = y_min
+ p0 = [a_guess, mu_guess, sigma_guess, c_guess]
popt, _ = scipy.optimize.curve_fit(gaussian_1d, x[mask], y[mask], p0=p0)
+ popt[1] += offset
return tuple(popt)
+
diff --git a/src/eaa/task_managers/base.py b/src/eaa/task_managers/base.py
index dc80a5d..df6078e 100644
--- a/src/eaa/task_managers/base.py
+++ b/src/eaa/task_managers/base.py
@@ -441,6 +441,8 @@ def launch_task_manager(self, task_request: str) -> None:
f"tools:\n{tool_catalog_json}\n"
f"User request: {request_text}"
)
+ # Escape image tag
+ parsing_prompt = parsing_prompt.replace("
None:
context=local_context,
return_outgoing_message=True,
)
- self.update_message_history(outgoing, update_context=False, update_full_history=True)
self.update_message_history(response, update_context=False, update_full_history=True)
local_context.append(outgoing)
local_context.append(response)
@@ -560,6 +561,12 @@ def launch_task_manager(self, task_request: str) -> None:
init_kwargs[key] = value
try:
+ log_message = generate_openai_message(
+ content=f"Instantiating task manager '{manager_name}' with init_kwargs: {init_kwargs}",
+ role="system",
+ )
+ self.update_message_history(log_message, update_context=True, update_full_history=True)
+ print_message(log_message)
sub_manager = manager_class(**init_kwargs)
except Exception as exc: # noqa: BLE001 - surface configuration errors
logger.exception("Failed to instantiate task manager '%s'", manager_name)
@@ -577,6 +584,16 @@ def launch_task_manager(self, task_request: str) -> None:
result = sub_manager.run_conversation()
else:
method_callable = getattr(sub_manager, resolved_method_name)
+ log_message = generate_openai_message(
+ content=f"Running method '{resolved_method_name}' with method_kwargs: {method_kwargs}. Proceed? (yes/no)",
+ role="system",
+ )
+ proceed = self.get_user_input(
+ prompt=log_message["content"],
+ display_prompt_in_webui=bool(self.message_db_conn),
+ )
+ if proceed.strip().lower() != "yes":
+ return
result = method_callable(**method_kwargs)
last_of_sub_manager_context = sub_manager.context[-1] if len(sub_manager.context) > 0 else None
except Exception as exc: # noqa: BLE001
@@ -625,77 +642,90 @@ def run_conversation(
"""
response = None
while True:
- if response is None or (response is not None and not has_tool_call(response)):
- message = self.get_user_input(
- prompt=(
- "Enter a message (/exit: exit; /return: return to upper level task; "
- "/help: show command help): "
+ try:
+ if response is None or (response is not None and not has_tool_call(response)):
+ message = self.get_user_input(
+ prompt=(
+ "Enter a message (/exit: exit; /return: return to upper level task; "
+ "/help: show command help): "
+ )
)
- )
- stripped_message = message.strip()
- command, _, remainder = stripped_message.partition(" ")
- command_lower = command.lower()
+ stripped_message = message.strip()
+ command, _, remainder = stripped_message.partition(" ")
+ command_lower = command.lower()
- if command_lower == "/exit" and remainder == "":
- break
- elif command_lower == "/return" and remainder == "":
- return
- elif command_lower == "/monitor":
- if len(remainder.strip()) == 0:
- logger.info("Monitoring command requires a task description.")
- else:
- self.enter_monitoring_mode(remainder.strip())
- continue
- elif command_lower == "/subtask":
- self.launch_task_manager(remainder.strip())
- continue
- elif command_lower == "/help" and remainder == "":
- self.display_command_help()
- continue
-
- # Send message and get response
- response, outgoing_message = self.agent.receive(
- message,
- context=self.context,
- return_outgoing_message=True
+ if command_lower == "/exit" and remainder == "":
+ break
+ elif command_lower == "/return" and remainder == "":
+ return
+ elif command_lower == "/monitor":
+ if len(remainder.strip()) == 0:
+ logger.info("Monitoring command requires a task description.")
+ else:
+ self.enter_monitoring_mode(remainder.strip())
+ continue
+ elif command_lower == "/subtask":
+ self.launch_task_manager(remainder.strip())
+ continue
+ elif command_lower == "/help" and remainder == "":
+ self.display_command_help()
+ continue
+
+ # Send message and get response
+ response, outgoing_message = self.agent.receive(
+ message,
+ context=self.context,
+ return_outgoing_message=True
+ )
+ # If message DB is used, user input should come from WebUI which writes
+ # to the DB, so we don't update DB again.
+ self.update_message_history(
+ outgoing_message,
+ update_context=True,
+ update_full_history=True,
+ update_db=(self.message_db_conn is None)
+ )
+ self.update_message_history(response, update_context=True, update_full_history=True)
+
+ # Handle tool calls
+ tool_responses, tool_response_types = self.agent.handle_tool_call(response, return_tool_return_types=True)
+ for tool_response, tool_response_type in zip(tool_responses, tool_response_types):
+ print_message(tool_response)
+ self.update_message_history(tool_response, update_context=True, update_full_history=True)
+
+ if len(tool_responses) >= 1:
+ for tool_response, tool_response_type in zip(tool_responses, tool_response_types):
+ # If the tool returns an image path, load the image and send it to
+ # the assistant in a follow-up message as user.
+ if tool_response_type == ToolReturnType.IMAGE_PATH:
+ image_path = tool_response["content"]
+ image_message = generate_openai_message(
+ content="Here is the image the tool returned.",
+ image_path=image_path,
+ role="user",
+ )
+ self.update_message_history(
+ image_message, update_context=store_all_images_in_context, update_full_history=True
+ )
+ # Send tool responses stored in the context
+ response = self.agent.receive(
+ message=None,
+ context=self.context,
+ return_outgoing_message=False
+ )
+ self.update_message_history(response, update_context=True, update_full_history=True)
+ except KeyboardInterrupt:
+ self.context = complete_unresponded_tool_calls(self.context)
+ response = generate_openai_message(
+ content="Workflow interrupted by keyboard interrupt. TERMINATE",
+ role="system"
)
- # If message DB is used, user input should come from WebUI which writes
- # to the DB, so we don't update DB again.
self.update_message_history(
- outgoing_message,
+ response,
update_context=True,
- update_full_history=True,
- update_db=(self.message_db_conn is None)
- )
- self.update_message_history(response, update_context=True, update_full_history=True)
-
- # Handle tool calls
- tool_responses, tool_response_types = self.agent.handle_tool_call(response, return_tool_return_types=True)
- for tool_response, tool_response_type in zip(tool_responses, tool_response_types):
- print_message(tool_response)
- self.update_message_history(tool_response, update_context=True, update_full_history=True)
-
- if len(tool_responses) >= 1:
- for tool_response, tool_response_type in zip(tool_responses, tool_response_types):
- # If the tool returns an image path, load the image and send it to
- # the assistant in a follow-up message as user.
- if tool_response_type == ToolReturnType.IMAGE_PATH:
- image_path = tool_response["content"]
- image_message = generate_openai_message(
- content="Here is the image the tool returned.",
- image_path=image_path,
- role="user",
- )
- self.update_message_history(
- image_message, update_context=store_all_images_in_context, update_full_history=True
- )
- # Send tool responses stored in the context
- response = self.agent.receive(
- message=None,
- context=self.context,
- return_outgoing_message=False
+ update_full_history=True
)
- self.update_message_history(response, update_context=True, update_full_history=True)
+ continue
def enter_monitoring_mode(
self,
diff --git a/src/eaa/task_managers/imaging/feature_tracking.py b/src/eaa/task_managers/imaging/feature_tracking.py
index 13d4576..34ef9fa 100644
--- a/src/eaa/task_managers/imaging/feature_tracking.py
+++ b/src/eaa/task_managers/imaging/feature_tracking.py
@@ -121,7 +121,9 @@ def run_fov_search(
Parameters
----------
feature_description : str
- A text description of the feature to search for.
+ A text description of the feature to search for. The message
+ can contain the
tag to include a
+ reference image of the feature.
y_range : tuple[float, float]
The range of y coordinates to search for the feature.
x_range : tuple[float, float]
@@ -157,10 +159,11 @@ def run_fov_search(
f"but go back to this size when you find the feature and acquire a "
f"final image of it.\n"
f"- Start from position (y={y_range[0]}, x={x_range[0]}), and gradually "
- f"move the FOV to find the feature. Positions should not go beyond "
- f"y={y_range[1]} and x={x_range[1]}. When moving the FOV, you can start "
- f"with the step size of {step_size[0]} in the y direction and {step_size[1]} "
- f"in the x direction. You can change the step sizes during the process.\n"
+ f"move the FOV to find the feature. Positions should stay in the range of "
+ f"y={y_range[0]} to {y_range[1]} and x={x_range[0]} to {x_range[1]}. \n"
+ f"- Use a regular grid search pattern at the beginning. Use a step size of {step_size[0]} "
+ f"in the y direction and {step_size[1]} in the x direction. When you see the\n"
+ f"feature, you can move the FOV more arbitrarily to make it better centered.\n"
f"- When you find the feature, adjust the positions of the FOV to make the "
f"feature centered in the FOV. If the feature is off to the left, move "
f"the FOV to the left; if the feature is off to the top, move the FOV "
@@ -170,6 +173,7 @@ def run_fov_search(
f"stop the process.\n"
f"- When you find the feature of interest, report the coordinates of the "
f"FOV.\n"
+ f"- Explain every tool call you make."
f"- When calling tools, make only one call at a time. Do not make "
f"another call before getting the response of a previous one. \n"
f"- When you finish the search or need user response, say 'TERMINATE'.\n"
diff --git a/src/eaa/task_managers/tuning/focusing.py b/src/eaa/task_managers/tuning/focusing.py
index acea1cf..67d4263 100644
--- a/src/eaa/task_managers/tuning/focusing.py
+++ b/src/eaa/task_managers/tuning/focusing.py
@@ -15,7 +15,6 @@
from eaa.message_proc import print_message
from eaa.api.llm_config import LLMConfig
from eaa.api.memory import MemoryManagerConfig
-from eaa.util import get_image_path_from_text
import eaa.image_proc as ip
from eaa.exceptions import MaxRoundsReached
@@ -227,10 +226,13 @@ def hook_function(image_path: str) -> None:
f"drift is {[float(shift[i] + scan_pos_diff[i]) for i in [0, 1]]} (y, x). Use this offset to "
f"to adjust the line scan positions by **adding** it to both "
f"the x and y coordinates of the start and end points of the previous line scan. "
- f"For your reference, the last line scan tool call is {self.acquisition_tool.line_scan_call_history[-1]}."
- f"Also use this offset to update the argument when you perform 2D image acquisition "
- f"next time. The last 2D image acquisition call is {self.acquisition_tool.image_acquisition_call_history[-1]}."
)
+ if len(self.acquisition_tool.line_scan_call_history[-1]) > 0:
+ message += (
+ f"For your reference, the last line scan tool call is {self.acquisition_tool.line_scan_call_history[-1]}."
+ f"Also use this offset to update the argument when you perform 2D image acquisition "
+ f"next time. The last 2D image acquisition call is {self.acquisition_tool.image_acquisition_call_history[-1]}."
+ )
self.last_acquisition_count_registered = self.acquisition_tool.counter_acquire_image
return [generate_openai_message(content=message, image_path=image_path)]
@@ -303,7 +305,7 @@ def register_images(self, image_k: np.ndarray, image_km1: np.ndarray) -> np.ndar
raise ValueError(
"`image_registration_tool` should be provided in the class constructor."
)
- registration_tool = copy.deepcopy(self.image_registration_tool)
+ registration_tool = self.image_registration_tool
shift = registration_tool.register_images(
image_t=registration_tool.process_image(image_k),
image_r=registration_tool.process_image(image_km1),
@@ -372,13 +374,6 @@ def run(
additional_prompt : Optional[str]
If provided, this prompt will be added to the initial prompt.
"""
- if reference_image_path is None:
- user_image_input = self.get_user_input(
- prompt="Please provide the reference image as:
.",
- display_prompt_in_webui=True
- )
- reference_image_path = get_image_path_from_text(user_image_input)
-
if reference_image_path is None and reference_feature_description is None:
raise ValueError(
"Either `reference_image_path` or `reference_feature_description` must be provided."
@@ -389,12 +384,23 @@ def run(
"`image_registration_tool` should be provided in the class constructor "
"if `use_registration_in_workflow` is True."
)
+
+ if reference_image_path is None:
+ reference_image_prompts = ""
+ else:
+ reference_image_prompts = (
+ f"
\n"
+ f"You will see a reference 2D scan image in this message. "
+ f"This image is acquired in the region of interest that "
+ f"contains the thin feature to be line-scanned. The line scan path "
+ f"across that feature is indicated by a marker."
+ )
if initial_prompt is None:
feat_text_description = ""
if reference_feature_description is not None:
feat_text_description = (
- f"Also, here is the description of the feature: **{reference_feature_description}**. "
+ f"Here is the description of the feature: **{reference_feature_description}**. "
)
param_step_size_prompt = ""
if suggested_parameter_step_size is not None:
@@ -425,7 +431,12 @@ def run(
"Use the offset given by image registration to adjust the line scan positions."
)
else:
- registration_prompt = ""
+ registration_prompt = (
+ "Use your registration tool to find the offset between the new image "
+ "and the previous one. Use this offset to adjust the line scan positions "
+ "by **adding** it to both the x and y coordinates of the start and end "
+ "points of the previous line scan. "
+ )
line_scan_positioning_prompt = (
"Read the coordinates of the line scan path from the axis ticks."
)
@@ -438,11 +449,8 @@ def run(
f"But each time you adjust the focus, the image may drift due to "
f"the change of the optics. You will need to perform a 2D scan "
f"prior to the line scan to locate the feature that is line-scanned.\n"
- f"
\n"
- f"You will see a reference 2D scan image in this message. "
- f"This image is acquired in the region of interest that "
- f"contains the thin feature to be line-scanned. The line scan path "
- f"across that feature is indicated by a marker. {feat_text_description}\n\n"
+ f"{reference_image_prompts}\n"
+ f"{feat_text_description}\n\n"
f"Follow the procedure below to focus the microscope:\n\n"
f"1. First, perform a 2D scan of the region of interest using the "
f"\"acquire_image\" tool and the following arguments: "
@@ -475,12 +483,11 @@ def run(
f"your response to hand over control back to the user.\n\n"
f"Important notes:\n\n"
f"- Your line scan should cross only one line feature, and you should see "
- f"**exactly one peak** in the line scan plot. If there isn't one, or if there "
- f"are multiple peaks, or if the Gaussian fit looks bad, check your arguments "
+ f"**exactly one peak** in the line scan plot. If there isn't one, or if "
+ f"the Gaussian fit looks bad, check your arguments "
f"to the line scan tool and run it again. Make sure your line scan strictly "
f"follow the marker in the reference image. Do not trust the FWHM value "
- f"in the line plot if there is no peak, if the peak is incomplete, or if "
- f"there are multiple peaks!\n"
+ f"in the line plot if there is no peak, or if the peak is incomplete!\n"
f"- The line scan plot should show a complete peak. If the peak is incomplete, "
f"adjust the line scan tool's arguments to make it complete.\n"
f"- The minimal point of the FWHM is indicated by an inflection of the trend "
diff --git a/src/eaa/tools/base.py b/src/eaa/tools/base.py
index a8a4c2c..a8cd6b4 100644
--- a/src/eaa/tools/base.py
+++ b/src/eaa/tools/base.py
@@ -207,7 +207,12 @@ def resolve_json_type(py_type):
json_type = resolve_json_type(type_hints[name])
description = f"{name} parameter"
if len(get_args(sig.parameters[name].annotation)) > 0:
- description = get_args(sig.parameters[name].annotation)[1]
+ try:
+ description = get_args(sig.parameters[name].annotation)[1]
+ except IndexError:
+ raise ValueError(
+ f"The description of parameter {name} is not provided in function {func.__name__}."
+ )
properties[name] = {**json_type, "description": description}
if param.default == inspect.Parameter.empty:
required.append(name)
diff --git a/src/eaa/tools/imaging/aps_mic/acquisition.py b/src/eaa/tools/imaging/aps_mic/acquisition.py
index a66d114..0522c16 100644
--- a/src/eaa/tools/imaging/aps_mic/acquisition.py
+++ b/src/eaa/tools/imaging/aps_mic/acquisition.py
@@ -2,9 +2,16 @@
import logging
import os
-from eaa.tools.imaging.acquisition import AcquireImage
-from eaa.tools.imaging.aps_mic.util import process_xrfdata, save_xrfdata
+from eaa.tools.base import ToolReturnType, ExposedToolSpec
+from eaa.tools.imaging.aps_mic.util import (
+ process_xrfdata,
+ save_xrf_line_scan,
+ validate_position_in_range,
+ save_xrfdata
+)
from eaa.util import wait_for_file
+# from eaa.tools.imaging.aps_mic.bluesky_init import BlueskyScanControl
+from eaa.tools.imaging.acquisition import AcquireImage
logger = logging.getLogger(__name__)
@@ -12,24 +19,35 @@
class BlueSkyAcquireImage(AcquireImage):
from bluesky.run_engine import RunEngine
- from mic_common.devices.save_data import SaveDataMic
from typing import Callable
+ from mic_common.devices.save_data import SaveDataMic
+ from mic_vis.s2idd.xrf_eaa import save_xrfdata
+ from ophyd import EpicsMotor
+ import bluesky.plan_stubs as bps
name: str = "bluesky_acquire_image"
RE: RunEngine = None
- scanplan: Callable = None
savedata: SaveDataMic = None
+ scan2d_plan: Callable = None
+ scan1d_plan: Callable = None
+ samy_motor: EpicsMotor = None
+ bps: Callable = bps
def __init__(
self,
sample_name: str = "smp1",
- dwell: float = 0,
+ dwell_imaging: float = 0.05,
+ dwell_line_scan: float = 0.2,
xrf_on: bool = True,
preamp1_on: bool = False,
using_xrf_maps: bool = False,
xrf_elms: Tuple[str, ...] = ("Cr",),
+ xrf_roi_num: int = 16,
allowable_x_range: Optional[Tuple[float, float]] = None,
allowable_y_range: Optional[Tuple[float, float]] = None,
+ allowable_z_range: Optional[Tuple[float, float]] = None,
+ plot_image_in_log_scale: bool = False,
+ show_colorbar_in_image: bool = False,
require_approval: bool = False,
*args, **kwargs
):
@@ -39,8 +57,10 @@ def __init__(
----------
sample_name : str, optional
The name of the sample.
- dwell : float, optional
- The dwell time.
+ dwell_imaging : float, optional
+ The dwell time in the unit of seconds for imaging.
+ dwell_line_scan : float, optional
+ The dwell time in the unit of seconds for line scan.
xrf_on : bool, optional
Whether to collect XRF data.
preamp1_on : bool, optional
@@ -53,34 +73,45 @@ def __init__(
The allowable range of scan center position in the x direction.
allowable_y_range: Optional[Tuple[float, float]], optional
The allowable range of scan center position in the y direction.
-
+ plot_image_in_log_scale: bool, optional
+ Whether to plot the image in log scale.
+ show_colorbar_in_image: bool, optional
+ Whether to show the colorbar in the image.
+
Raises
------
ImportError
If Bluesky control initialization fails.
"""
- try:
- from eaa.tools.imaging.aps_mic.bluesky_init import RE, fly2d, get_control_components
- self.RE = RE
- self.scanplan = fly2d
- self.savedata = get_control_components("savedata")
- except ImportError:
- raise ImportError(
- "Bluesky control initialization failed. "
- "Please check that the bluesky-mic package is installed "
- "and the motors can only be reached from private subnet computers."
- )
-
+
self.sample_name = sample_name
- self.dwell = dwell
+ self.dwell_imaging = dwell_imaging
+ self.dwell_line_scan = dwell_line_scan
self.xrf_on = xrf_on
self.preamp1_on = preamp1_on
self.using_xrf_maps = using_xrf_maps
self.xrf_elms = xrf_elms
+ self.xrf_roi_num = xrf_roi_num
self.allowable_x_range = allowable_x_range
self.allowable_y_range = allowable_y_range
+ self.allowable_z_range = allowable_z_range
+ self.plot_image_in_log_scale = plot_image_in_log_scale
+ self.show_colorbar_in_image = show_colorbar_in_image
super().__init__(*args, require_approval=require_approval, **kwargs)
+ self.exposed_tools = [
+ ExposedToolSpec(
+ name="acquire_image",
+ function=self.acquire_image,
+ return_type=ToolReturnType.IMAGE_PATH,
+ ),
+ ExposedToolSpec(
+ name="acquire_line_scan",
+ function=self.acquire_line_scan,
+ return_type=ToolReturnType.IMAGE_PATH,
+ ),
+ ]
+
def acquire_image(
self,
width: Annotated[float, "The width of the scan area in microns"] = 0,
@@ -116,33 +147,25 @@ def acquire_image(
"""
self.update_image_acquisition_call_history(x_center, y_center, width, height, stepsize_x, stepsize_y)
try:
- if self.allowable_x_range:
- if x_center < self.allowable_x_range[0] or x_center > self.allowable_x_range[1]:
- raise ValueError(
- f"The scan center position in the x direction {x_center} um is out "
- f"of the allowable range {self.allowable_x_range} um."
- )
- if self.allowable_y_range:
- if y_center < self.allowable_y_range[0] or y_center > self.allowable_y_range[1]:
- raise ValueError(
- f"The scan center position in the y direction {y_center} um is out "
- f"of the allowable range {self.allowable_y_range} um."
- )
-
+ validate_position_in_range(x_center, self.allowable_x_range, "x")
+ validate_position_in_range(y_center, self.allowable_y_range, "y")
logger.info(f"Acquiring image of size {width} um x {height} um at location {x_center} um, {y_center} um.")
- self.savedata.update_next_file_name()
- self.RE(self.scanplan(
- samplename=self.sample_name,
- width=width,
- x_center=x_center,
- stepsize_x=stepsize_x,
- height=height,
- y_center=y_center,
- stepsize_y=stepsize_y,
- dwell=self.dwell,
- xrf_on=self.xrf_on,
- preamp1_on=self.preamp1_on,
- ))
+
+ if self.RE is not None:
+ self.RE(self.scan2d_plan(
+ samplename=self.sample_name,
+ width=width,
+ x_center=x_center,
+ stepsize_x=stepsize_x,
+ height=height,
+ y_center=y_center,
+ stepsize_y=stepsize_y,
+ dwell_ms=self.dwell_imaging*1000,
+ xrf_on=self.xrf_on,
+ preamp1_on=self.preamp1_on,
+ ))
+ else:
+ raise ValueError("RunEngine is not initialized.")
mda_path = self.savedata.full_path_name.get()
mda_dir = mda_path.replace("data1", "mnt/micdata1")
@@ -173,7 +196,12 @@ def acquire_image(
f"{current_mda_file}.h50")
img_path, img_arr = save_xrfdata(
- img_h5_path, png_output_dir, elms=self.xrf_elms, return_image_array=True
+ img_h5_path,
+ png_output_dir,
+ elms=self.xrf_elms,
+ return_image_array=True,
+ plot_in_log_scale=self.plot_image_in_log_scale,
+ show_colorbar_in_image=self.show_colorbar_in_image
)
wait_for_file(img_path, duration=5)
@@ -189,3 +217,113 @@ def acquire_image(
except Exception as e:
logger.error(f"Error acquiring image: {e}")
raise e
+
+
+ def acquire_line_scan(
+ self,
+ width: Annotated[float, "The width of the scan area in microns"] = 0,
+ x_center: Annotated[float, "The center of the scan area in the x direction in microns"] = None,
+ y_center: Annotated[float, "The center of the scan area in the y direction in microns"] = None,
+ stepsize_x: Annotated[float, "The scan step size in the x direction, i.e., the distance between two adjacent pixels in the x direction in microns"] = 0,
+
+ )->Annotated[str, "The path to the plot of the line scan."]:
+ """Acquire a horizontal line scan of a given width at a given center position.
+ This function returns a plot of the acquired data, and a Gaussian fit of it.
+ The FWHM of the Gaussian fit is annotated on the plot.
+
+ Parameters
+ ----------
+ width: float
+ The width of the scan area in microns.
+ x_center: float
+ The center of the scan area in the x direction in microns.
+ y_center: float
+ The center of the scan area in the y direction in microns.
+ stepsize_x: float
+ The scan step size in the x direction, i.e., the distance between
+ two adjacent pixels in the x direction in microns.
+
+ Returns
+ -------
+ str
+ The path of the plot of the line scan saved in hard drive.
+
+ """
+ self.set_motor_y(y_center)
+
+ start_x = x_center - width/2
+ end_x = x_center + width/2
+ self.update_line_scan_call_history(start_x=start_x, end_x=end_x, step=stepsize_x, start_y=None, end_y=None)
+
+ try:
+ validate_position_in_range(start_x, self.allowable_x_range, "x")
+ validate_position_in_range(end_x, self.allowable_x_range, "x")
+ logger.info(f"Acquiring line scan of width {width} um at center location of {x_center} um.")
+
+ if self.RE is not None:
+ self.RE(self.scan1d_plan(
+ samplename=self.sample_name,
+ width=width,
+ x_center=x_center,
+ stepsize_x=stepsize_x,
+ dwell_ms=self.dwell_line_scan*1000,
+ xrf_on=self.xrf_on,
+ preamp1_on=self.preamp1_on,
+ ))
+ else:
+ raise ValueError("RunEngine is not initialized.")
+
+ mda_path = self.savedata.full_path_name.get()
+ mda_dir = mda_path.replace("data1", "mnt/micdata1")
+ parent_dir = os.path.dirname(os.path.dirname(mda_dir))
+ png_output_dir = os.path.join(parent_dir, "png_output")
+ current_mda_file = self.savedata.next_file_name
+ mda_file_path = os.path.join(mda_dir, current_mda_file)
+
+ logger.info(f"About to process the data... {current_mda_file}")
+ process_code = wait_for_file(mda_file_path, duration=20)
+
+ if process_code:
+ if not os.path.exists(png_output_dir):
+ os.makedirs(png_output_dir)
+
+ img_path, _ = save_xrf_line_scan(
+ mda_file_path, png_output_dir, roi_num=self.xrf_roi_num,
+ return_line_array=True
+ )
+ wait_for_file(img_path, duration=5)
+
+ # self.update_line_scan_buffers(img_arr, psize=stepsize_x)
+ if img_path:
+ return img_path
+ else:
+ logger.error(f"Failed to save images for {current_mda_file}")
+ return f"Failed to save images for {current_mda_file}"
+ logger.error(f"Failed to process {current_mda_file}")
+ return f"Failed to process {current_mda_file}"
+
+ except Exception as e:
+ logger.error(f"Error acquiring line scan: {e}")
+ raise e
+
+ def set_motor_y(
+ self,
+ value: float
+ ) -> str:
+ """Set the sample y motor position of the imaging system.
+
+ Parameters
+ ----------
+ value: float
+ The value to set the sample y motor to.
+ """
+ if self.RE is None:
+ raise ValueError("RunEngine is not set")
+ if self.samy_motor is None:
+ raise ValueError("samz_motor is not set")
+
+ validate_position_in_range(value, self.allowable_y_range, "y")
+ self.RE(self.bps.mv(self.samy_motor, value))
+ msg = f"Move sample y motor to position: {value}"
+ logger.info(msg)
+ return msg
diff --git a/src/eaa/tools/imaging/aps_mic/bluesky_init.py b/src/eaa/tools/imaging/aps_mic/bluesky_init.py
deleted file mode 100644
index e6ab1f5..0000000
--- a/src/eaa/tools/imaging/aps_mic/bluesky_init.py
+++ /dev/null
@@ -1,13 +0,0 @@
-#initiating and loading the bluesky environment for the ISN
-
-# ruff: noqa: F403
-# from isn.startup import *
-from s2idd_uprobe.startup import *
-
-
-def get_control_components(device_name: str):
- try:
- # ruff: noqa: F405
- return oregistry[device_name]
- except KeyError:
- raise KeyError(f"Device {device_name} not found in the oregistry")
diff --git a/src/eaa/tools/imaging/aps_mic/param_tuning.py b/src/eaa/tools/imaging/aps_mic/param_tuning.py
new file mode 100644
index 0000000..1592688
--- /dev/null
+++ b/src/eaa/tools/imaging/aps_mic/param_tuning.py
@@ -0,0 +1,96 @@
+from typing import Annotated, Optional, Tuple
+import logging
+
+from eaa.tools.base import ToolReturnType, ExposedToolSpec
+from eaa.tools.imaging.param_tuning import SetParameters
+from eaa.tools.imaging.aps_mic.util import validate_position_in_range
+
+logger = logging.getLogger(__name__)
+
+
+class BlueskySetParameters(SetParameters):
+
+ from bluesky.run_engine import RunEngine
+ from typing import Callable
+ from ophyd import EpicsMotor
+ import bluesky.plan_stubs as bps
+
+ name: str = "bluesky_set_parameters"
+ samz_motor: EpicsMotor = None
+ RE: RunEngine = None
+ allowable_z_range: Optional[Tuple[float, float]] = None
+ bps: Callable = bps
+
+ def __init__(
+ self,
+ parameter_names: list[str] = ["zp-z"],
+ parameter_ranges: list[tuple[float, ...], tuple[float, ...]] = ((-200,),(-180,)),
+ require_approval: bool = False,
+ *args, **kwargs
+ ):
+ """Parameter tuning tool for the imaging system.
+
+ Parameters
+ ----------
+ parameter_names: list[str]
+ The names of the parameters.
+ parameter_ranges: list[tuple[float, ...], tuple[float, ...]]
+ The ranges of the parameters. It should be given as a list of 2 sub-lists,
+ with the first sub-list containing the lower bounds and the second
+ sub-list containing the upper bounds. These ranges will be used to
+ scale the parameter errors. As such, inf is not allowed.
+ The unit is in millimeters.
+ require_approval: bool
+ Whether to require approval for the parameter tuning.
+ """
+
+ # self.parameter_names = parameter_names
+ # self.parameter_ranges = parameter_ranges
+
+ super().__init__(
+ *args,
+ parameter_names=parameter_names,
+ parameter_ranges=parameter_ranges,
+ require_approval=require_approval,
+ **kwargs
+ )
+
+ self.exposed_tools = [
+ ExposedToolSpec(
+ name="set_parameters",
+ function=self.set_parameters,
+ return_type=ToolReturnType.TEXT,
+ )
+ ]
+
+ def set_parameters(
+ self,
+ parameters: Annotated[
+ list[float],
+ "The parameters to set the optics to. For this function, "
+ "the list should only contain one element giving the z position."
+ ]
+ ) -> str:
+ """Set the sample z motor position of the imaging system.
+
+ Parameters
+ ----------
+ parameters: list[float]
+ The parameters to set the optics to. For this function,
+ the list should only contain one element giving the z position.
+ """
+ if self.RE is None:
+ raise ValueError("RunEngine is not set")
+ if self.zp_z_motor is None:
+ raise ValueError("zp_z_motor is not set")
+ if self.parameter_ranges is not None:
+ validate_position_in_range(
+ parameters[0],
+ (self.parameter_ranges[0][0], self.parameter_ranges[1][0]),
+ "z")
+ self.RE(self.bps.mv(self.zp_z_motor, parameters[0]))
+ msg = f"Moved Zone Plate z position to position: {parameters[0]}"
+ logger.info(msg)
+ return msg
+ else:
+ raise ValueError("parameter_ranges is not set")
diff --git a/src/eaa/tools/imaging/aps_mic/scan_control.py b/src/eaa/tools/imaging/aps_mic/scan_control.py
new file mode 100644
index 0000000..458a3ff
--- /dev/null
+++ b/src/eaa/tools/imaging/aps_mic/scan_control.py
@@ -0,0 +1,43 @@
+#initiating and loading the bluesky environment for the ISN
+
+# ruff: noqa: F403
+
+from eaa.tools.base import BaseTool
+from eaa.tools.imaging.aps_mic.acquisition import BlueSkyAcquireImage
+from eaa.tools.imaging.aps_mic.param_tuning import BlueskySetParameters
+
+
+class BlueskyScanControl(BaseTool):
+
+ from typing import Callable
+ from ophyd import EpicsMotor
+
+ samz_motor: EpicsMotor = None
+ scan2d_plan: Callable = None
+ scan1d_plan: Callable = None
+ acquire_image_tool: BlueSkyAcquireImage = None
+ param_tuning_tool: BlueskySetParameters = None
+
+ def __init__(self, require_approval: bool = False, *args, **kwargs):
+ super().__init__(require_approval=require_approval, *args, **kwargs)
+
+ try:
+ from s2idd_uprobe.startup import RE, oregistry, fly2d_scanrecord, step1d_scanrecord
+ except ImportError:
+ raise ImportError(
+ "Bluesky control initialization failed. Please check that the bluesky-mic package is installed "
+ "and the motors can only be reached from private subnet computers."
+ )
+
+ self.acquire_image_tool = BlueSkyAcquireImage()
+ self.acquire_image_tool.RE = RE
+ self.acquire_image_tool.savedata = oregistry["savedata"]
+ self.acquire_image_tool.scan2d_plan = fly2d_scanrecord
+ self.acquire_image_tool.scan1d_plan = step1d_scanrecord
+ self.acquire_image_tool.samy_motor = oregistry["samy"]
+
+ self.param_tuning_tool = BlueskySetParameters()
+ self.param_tuning_tool.RE = RE
+ self.param_tuning_tool.zp_z_motor = oregistry["zp_z"]
+
+ self.exposed_tools = self.acquire_image_tool.exposed_tools + self.param_tuning_tool.exposed_tools
diff --git a/src/eaa/tools/imaging/aps_mic/util.py b/src/eaa/tools/imaging/aps_mic/util.py
index 9e5df4f..e5fef8c 100644
--- a/src/eaa/tools/imaging/aps_mic/util.py
+++ b/src/eaa/tools/imaging/aps_mic/util.py
@@ -3,15 +3,54 @@
"""
import subprocess
+import logging
+from typing import Optional, Tuple
import os
-import h5py
+
import numpy as np
import matplotlib.pyplot as plt
-import logging
+import h5py
+
+from mic_vis.s2idd.mda import get_roi_from_mda
+import eaa.maths
logger = logging.getLogger(__name__)
+def validate_position_in_range(
+ center: Optional[float],
+ allowable_range: Optional[Tuple[float, float]],
+ axis_label: str,
+) -> None:
+ """Validate that a center position lies within an allowable range."""
+ if allowable_range is None:
+ return
+
+ if len(allowable_range) != 2:
+ raise ValueError(
+ f"The allowable range for the {axis_label} direction must contain exactly two values."
+ )
+
+ lower, upper = allowable_range
+ if lower > upper:
+ raise ValueError(
+ f"The allowable range for the {axis_label} direction "
+ f"({allowable_range}) has the lower bound greater than the upper bound."
+ )
+
+ if center is None:
+ raise ValueError(
+ f"The scan center position in the {axis_label} direction must be provided "
+ "when an allowable range is set."
+ )
+
+ if not lower <= center <= upper:
+ raise ValueError(
+ f"The scan center position in the {axis_label} direction {center} um is out "
+ f"of the allowable range {allowable_range} um."
+ )
+
+
def run_xrfmaps_exe(exe_path, args=None):
"""
Runs an executable file.
@@ -108,6 +147,100 @@ def process_xrfdata(
return None
+def plot_xrf_line_scan(x, y, val_gauss, fwhm, scan_name, roi_num):
+ """
+ Plot the XRF line scan data.
+ """
+ fig, ax = plt.subplots(1, 1, squeeze=True)
+ ax.plot(x, y, label="data")
+ if val_gauss is not None:
+ ax.plot(x, val_gauss, linestyle="--", color="red", label="fit")
+ ax.text(
+ 0.05,
+ 0.95,
+ f"FWHM = {fwhm:.2f}",
+ transform=ax.transAxes,
+ verticalalignment="top",
+ horizontalalignment="left"
+ )
+
+ ax.legend()
+ ax.set_xlabel("X-axis Position")
+ ax.set_ylabel("Intensity")
+ ax.set_title(f"{scan_name}-{roi_num}")
+ ax.grid(True)
+ plt.tight_layout()
+ return fig
+
+
+def save_xrf_line_scan(
+ mda_path: str,
+ output_dir: str,
+ roi_num: int,
+ y_threshold: float = 0.0,
+ return_line_array: bool = False
+) -> str | None:
+
+ """
+ Save the XRF line scan data in png format.
+
+ Parameters
+ ----------
+ mda_path : str
+ The path to the MDA file.
+ output_dir : str
+ The path to the output directory.
+ roi_num : int
+ The ROI number of the elements of interest.
+ y_threshold : float
+ The threshold for the Gaussian fit.
+ return_line_array : bool
+ If True, the line array will be returned.
+
+ Returns
+ -------
+ str | None
+ The path to the saved image.
+ """
+
+ try:
+ roi_data, position_data = get_roi_from_mda(mda_path, roi_num)
+ except Exception as e:
+ logger.error(f"Failed to get ROI or position data from MDA file {mda_path}: {e}")
+ return None
+
+ if any(data is None for data in [roi_data, position_data]):
+ logger.error(f"Failed to get ROI or position data from MDA file {mda_path}")
+ return None
+ else:
+ order = np.argsort(position_data)
+ x = position_data[order]
+ y = roi_data[order]
+
+ # Fit a Gaussian to the data
+ try:
+ a, mu, sigma, c = eaa.maths.fit_gaussian_1d(x, y, y_threshold=y_threshold)
+ val_gauss = eaa.maths.gaussian_1d(x, a, mu, sigma, c)
+ fwhm = 2.35 * abs(sigma)
+ except RuntimeError as e:
+ logger.error(f"Failed to fit Gaussian to data: {e}")
+ val_gauss = None
+ fwhm = np.nan
+
+ # Plot the data and the fit
+ scan_name = os.path.basename(mda_path)
+ fig = plot_xrf_line_scan(x, y, val_gauss, fwhm, scan_name, roi_num)
+ sc = scan_name.replace(".mda", "")
+ fname = f"{output_dir}/{sc}_ROI{roi_num}.png"
+ fig.savefig(fname)
+ plt.close(fig)
+ logger.info(f"Image saved to {fname}")
+ if return_line_array:
+ return fname, [x, y, val_gauss, fwhm]
+ else:
+ return fname
+
+
def load_h5(img_h5_path, fit_type=["NNLS", "ROI"], fsizelim=1e3) -> dict:
"""
Load the XRF data from the h5 file.
@@ -151,7 +284,11 @@ def load_h5(img_h5_path, fit_type=["NNLS", "ROI"], fsizelim=1e3) -> dict:
return None
-def plot_xrfdata(plotarr, xaxis, yaxis, scan_name, elm_name, cmap, vmax, vmin):
+def plot_xrfdata(
+ plotarr, xaxis, yaxis, scan_name, elm_name, cmap, vmax, vmin,
+ plot_in_log_scale: bool = False,
+ show_colorbar: bool = False
+):
"""
Plot the XRF data.
@@ -173,16 +310,27 @@ def plot_xrfdata(plotarr, xaxis, yaxis, scan_name, elm_name, cmap, vmax, vmin):
The maximum value of the colorbar.
vmin : float
The minimum value of the colorbar.
-
+ plot_in_log_scale : bool
+ Whether to plot the image in log scale.
+ show_colorbar : bool
+ Whether to show the colorbar in the image.
+
Returns
-------
matplotlib.figure.Figure
The figure object.
"""
fig, ax = plt.subplots(figsize=(5, 5))
- ax.imshow(plotarr, cmap=cmap, vmax=vmax, vmin=vmin)
+ if plot_in_log_scale:
+ plotarr = np.log10(plotarr + 1)
+ vmax = np.log10(vmax + 1)
+ vmin = np.log10(vmin + 1)
+ im = ax.imshow(plotarr, cmap=cmap, vmax=vmax, vmin=vmin)
ax.set_title(f"{scan_name} {elm_name}")
-
+ if show_colorbar:
+ cbar = fig.colorbar(im)
+ cbar.set_label("Intensity")
+
# Show only 5 ticks for both x- and y- axes
xticks = np.linspace(0, len(xaxis) - 1, 5, dtype=int)
yticks = np.linspace(0, len(yaxis) - 1, 5, dtype=int)
@@ -202,7 +350,9 @@ def save_xrfdata(
elms: list[str] = None,
vmax_th: float = 99,
vmin: float = 0,
- return_image_array: bool = False
+ plot_in_log_scale: bool = False,
+ return_image_array: bool = False,
+ show_colorbar_in_image: bool = False
) -> str | None:
"""
Save the XRF data in png format.
@@ -221,10 +371,14 @@ def save_xrfdata(
The threshold for the maximum percentile of the colorbar.
vmin : float
The minimum value of the colorbar.
+ plot_in_log_scale : bool
+ Whether to plot the image in log scale.
return_image_array : bool
If True, an numpy array of the image will be returned
in addition to the path to the saved image.
-
+ show_colorbar_in_image : bool
+ Whether to show the colorbar in the image.
+
Returns
-------
str | None
@@ -244,7 +398,11 @@ def save_xrfdata(
for e in plot_elms:
plotarr = data_arr[data_ch.index(e)]
vmax = np.nanpercentile(plotarr, vmax_th)
- fig = plot_xrfdata(plotarr, xaxis, yaxis, data["scan"], e, cmap, vmax, vmin)
+ fig = plot_xrfdata(
+ plotarr, xaxis, yaxis, data["scan"], e, cmap, vmax, vmin,
+ plot_in_log_scale=plot_in_log_scale,
+ show_colorbar=show_colorbar_in_image
+ )
fname = f"{output_dir}/{data['scan']}_{e}.png"
fig.savefig(fname)
plt.close(fig)
@@ -258,4 +416,4 @@ def save_xrfdata(
if return_image_array:
return None, None
else:
- return None
+ return None
\ No newline at end of file
diff --git a/src/eaa/tools/imaging/param_tuning.py b/src/eaa/tools/imaging/param_tuning.py
index 4429b12..96f3685 100644
--- a/src/eaa/tools/imaging/param_tuning.py
+++ b/src/eaa/tools/imaging/param_tuning.py
@@ -107,22 +107,6 @@ def update_parameter_history(self, parameters: list[float] | dict[str, float]):
self.parameter_history[self.parameter_names[i]].append(val)
-class BlueSkySetParameters(SetParameters):
-
- name: str = "bluesky_tune_optics_parameters"
-
- def __init__(
- self,
- *args,
- require_approval: bool = False,
- **kwargs,
- ):
- super().__init__(*args, require_approval=require_approval, **kwargs)
-
- def set_parameters(*args, **kwargs):
- raise NotImplementedError
-
-
class SimulatedSetParameters(SetParameters):
name: str = "simulated_tune_optics_parameters"
diff --git a/src/eaa/tools/imaging/registration.py b/src/eaa/tools/imaging/registration.py
index a4eaf5d..906b517 100644
--- a/src/eaa/tools/imaging/registration.py
+++ b/src/eaa/tools/imaging/registration.py
@@ -210,6 +210,5 @@ def register_images(
# Convert the offset from pixel units to physical units. We use psize_r here
# since the target image has already been resized to have the same pixel size
# as the reference image.
- if psize_t != psize_r:
- offset = offset * psize_r
+ offset = offset * psize_r
return offset
diff --git a/uv.lock b/uv.lock
index cb02248..a393cab 100644
--- a/uv.lock
+++ b/uv.lock
@@ -1168,6 +1168,7 @@ aps-mic = [
{ name = "bluesky" },
{ name = "h5py" },
{ name = "mic-instrument" },
+ { name = "mic-vis" },
]
asksage = [
{ name = "asksageclient" },
@@ -1194,6 +1195,7 @@ requires-dist = [
{ name = "h5py", marker = "extra == 'aps-mic'" },
{ name = "matplotlib" },
{ name = "mic-instrument", marker = "extra == 'aps-mic'", git = "https://github.com/BCDA-APS/bluesky-mic.git" },
+ { name = "mic-vis", marker = "extra == 'aps-mic'", git = "https://github.com/grace227/mic-vis" },
{ name = "numpy" },
{ name = "openai" },
{ name = "psycopg", extras = ["binary"], marker = "extra == 'postgresql-vector-store'", specifier = ">=3.2.10" },
@@ -2658,6 +2660,17 @@ dependencies = [
{ name = "apsbits" },
]
+[[package]]
+name = "mic-vis"
+version = "0.1.0"
+source = { git = "https://github.com/grace227/mic-vis#51e95c07537ae10f4c7e1a002eeb9929b592d12a" }
+dependencies = [
+ { name = "h5py" },
+ { name = "matplotlib" },
+ { name = "numpy" },
+ { name = "pandas" },
+]
+
[[package]]
name = "mistune"
version = "3.1.3"