|
1 | 1 | import uuid |
2 | | -from typing import Dict, List, Optional, Union, cast |
| 2 | +from typing import Dict, List, Optional, Tuple, Union, cast |
3 | 3 |
|
4 | 4 | from pylabrobot import utils |
5 | 5 | from pylabrobot.liquid_handling.backends.backend import ( |
@@ -579,6 +579,61 @@ async def list_connected_modules(self) -> List[dict]: |
579 | 579 | """List all connected temperature modules.""" |
580 | 580 | return cast(List[dict], ot_api.modules.list_connected_modules()) |
581 | 581 |
|
| 582 | + def _pipette_id_for_channel(self, channel: int) -> str: |
| 583 | + pipettes = [] |
| 584 | + if self.left_pipette is not None: |
| 585 | + pipettes.append(self.left_pipette["pipetteId"]) |
| 586 | + if self.right_pipette is not None: |
| 587 | + pipettes.append(self.right_pipette["pipetteId"]) |
| 588 | + if channel < 0 or channel >= len(pipettes): |
| 589 | + raise NoChannelError(f"Channel {channel} not available on this OT-2 setup.") |
| 590 | + return pipettes[channel] |
| 591 | + |
| 592 | + def _current_channel_position(self, channel: int) -> Tuple[str, Coordinate]: |
| 593 | + """Return the pipette id and current coordinate for a given channel.""" |
| 594 | + |
| 595 | + pipette_id = self._pipette_id_for_channel(channel) |
| 596 | + try: |
| 597 | + res = ot_api.lh.save_position(pipette_id=pipette_id) |
| 598 | + pos = res["data"]["result"]["position"] |
| 599 | + current = Coordinate(pos["x"], pos["y"], pos["z"]) |
| 600 | + except Exception as exc: # noqa: BLE001 |
| 601 | + raise RuntimeError("Failed to query current pipette position") from exc |
| 602 | + |
| 603 | + return pipette_id, current |
| 604 | + |
| 605 | + async def prepare_for_manual_channel_operation(self, channel: int): |
| 606 | + """Validate channel exists (no-op otherwise for OT-2).""" |
| 607 | + |
| 608 | + _ = self._pipette_id_for_channel(channel) |
| 609 | + |
| 610 | + async def move_channel_x(self, channel: int, x: float): |
| 611 | + """Move a channel to an absolute x coordinate using savePosition to seed pose.""" |
| 612 | + |
| 613 | + pipette_id, current = self._current_channel_position(channel) |
| 614 | + target = Coordinate(x=x, y=current.y, z=current.z) |
| 615 | + await self.move_pipette_head( |
| 616 | + location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id |
| 617 | + ) |
| 618 | + |
| 619 | + async def move_channel_y(self, channel: int, y: float): |
| 620 | + """Move a channel to an absolute y coordinate using savePosition to seed pose.""" |
| 621 | + |
| 622 | + pipette_id, current = self._current_channel_position(channel) |
| 623 | + target = Coordinate(x=current.x, y=y, z=current.z) |
| 624 | + await self.move_pipette_head( |
| 625 | + location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id |
| 626 | + ) |
| 627 | + |
| 628 | + async def move_channel_z(self, channel: int, z: float): |
| 629 | + """Move a channel to an absolute z coordinate using savePosition to seed pose.""" |
| 630 | + |
| 631 | + pipette_id, current = self._current_channel_position(channel) |
| 632 | + target = Coordinate(x=current.x, y=current.y, z=z) |
| 633 | + await self.move_pipette_head( |
| 634 | + location=target, minimum_z_height=self.traversal_height, pipette_id=pipette_id |
| 635 | + ) |
| 636 | + |
582 | 637 | async def move_pipette_head( |
583 | 638 | self, |
584 | 639 | location: Coordinate, |
|
0 commit comments