Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 51 additions & 14 deletions openfe/storage/warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,16 @@ class WarehouseStores(TypedDict):
----------
setup : ExternalStorage
Storage location for setup-related objects and configurations.
result : ExternalStorage
Storage location for result-related object.

Notes
-----
Additional stores for results and tasks may be added in future versions.
"""

setup: ExternalStorage
# We will add a result and task store here in the future.
result: ExternalStorage


class WarehouseBaseClass:
Expand Down Expand Up @@ -63,7 +65,7 @@ def __repr__(self):
# probably should include repr of external store, too
return f"{self.__class__.__name__}({self.stores})"

def delete(self, store_name: Literal["setup"], location: str):
def delete(self, store_name: Literal["setup", "result"], location: str):
"""Delete an object from a specific store.

Parameters
Expand Down Expand Up @@ -106,6 +108,31 @@ def load_setup_tokenizable(self, obj: GufeKey) -> GufeTokenizable:
"""
return self._load_gufe_tokenizable(gufe_key=obj)

def store_result_tokenizable(self, obj: GufeTokenizable):
"""Store a GufeTokenizable object from the result store.

Parameters
----------
obj : GufeKey
The key of the object to store.
"""
return self._store_gufe_tokenizable("result", obj)

def load_result_tokenizable(self, obj: GufeKey) -> GufeTokenizable:
"""Load a GufeTokenizable object from the result store.

Parameters
----------
obj : GufeKey
The key of the object to load.

Returns
-------
GufeTokenizable
The loaded object.
"""
return self._load_gufe_tokenizable(gufe_key=obj)

def exists(self, key: GufeKey) -> bool:
"""Check if an object with the given key exists in any store.

Expand Down Expand Up @@ -144,15 +171,15 @@ def _get_store_for_key(self, key: GufeKey) -> ExternalStorage:
return self.stores[name]
raise ValueError(f"GufeKey {key} is not stored")

def _store_gufe_tokenizable(self, store_name: Literal["setup"], obj: GufeTokenizable):
def _store_gufe_tokenizable(self, store_name: Literal["setup", "result"], obj: GufeTokenizable):
"""Store a GufeTokenizable object with deduplication.

Parameters
----------
store_name : Literal["setup"]
Name of the store to store the object in.
obj : GufeTokenizable
The object to store.
Parameters
----------
store_name : Literal["setup"]
Name of the store to store the object in.
obj : GufeTokenizable
The object to store.

Notes
-----
Expand Down Expand Up @@ -246,15 +273,26 @@ def recursive_build_object_cache(key: GufeKey) -> GufeTokenizable:

@property
def setup_store(self):
"""Get the setup store.
"""Get the setup store

Returns
-------
ExternalStorage
The setup storage location.
The setup storage location
"""
return self.stores["setup"]

@property
def result_store(self):
"""Get the result store.

Returns
-------
ExternalStorage
The result storage location
"""
return self.stores["result"]


class FileSystemWarehouse(WarehouseBaseClass):
"""Warehouse implementation using local filesystem storage.
Expand All @@ -276,7 +314,6 @@ class FileSystemWarehouse(WarehouseBaseClass):

def __init__(self, root_dir: str = "warehouse"):
setup_store = FileStorage(f"{root_dir}/setup")
# When we add a result store it will look like this
# result_store = FileStorage(f"{root_dir}/results")
stores = WarehouseStores(setup=setup_store)
result_store = FileStorage(f"{root_dir}/result")
stores = WarehouseStores(setup=setup_store, result=result_store)
super().__init__(stores)
101 changes: 56 additions & 45 deletions openfe/tests/storage/test_warehouse.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import tempfile
from pathlib import Path
from typing import Literal
from unittest import mock

import pytest
Expand All @@ -19,28 +20,42 @@ def test_store_protocol_dag_result(self):
pytest.skip("Not implemented yet")

@staticmethod
def _test_store_load_same_process(obj, store_func_name, load_func_name):
store = MemoryStorage()
stores = WarehouseStores(setup=store)
def _test_store_load_same_process(
obj, store_func_name, load_func_name, store_name: Literal["setup", "result"]
):
setup_store = MemoryStorage()
result_store = MemoryStorage()
stores = WarehouseStores(setup=setup_store, result=result_store)
client = WarehouseBaseClass(stores)
store_func = getattr(client, store_func_name)
load_func = getattr(client, load_func_name)
assert store._data == {}
assert setup_store._data == {}
assert result_store._data == {}
store_func(obj)
assert store._data != {}
reloaded = load_func(obj.key)
store_under_test: MemoryStorage = stores[store_name]
assert store_under_test._data != {}
reloaded: GufeTokenizable = load_func(obj.key)
assert reloaded is obj
return reloaded, client

@staticmethod
def _test_store_load_different_process(obj: GufeTokenizable, store_func_name, load_func_name):
store = MemoryStorage()
stores = WarehouseStores(setup=store)
def _test_store_load_different_process(
obj: GufeTokenizable,
store_func_name,
load_func_name,
store_name: Literal["setup", "result"],
):
setup_store = MemoryStorage()
result_store = MemoryStorage()
stores = WarehouseStores(setup=setup_store, result=result_store)
client = WarehouseBaseClass(stores)
store_func = getattr(client, store_func_name)
load_func = getattr(client, load_func_name)
assert store._data == {}
assert setup_store._data == {}
assert result_store._data == {}
store_func(obj)
assert store._data != {}
store_under_test: MemoryStorage = stores[store_name]
assert store_under_test._data != {}
# make it look like we have an empty cache, as if this was a
# different process
key = obj.key
Expand All @@ -54,60 +69,56 @@ def _test_store_load_different_process(obj: GufeTokenizable, store_func_name, lo
"fixture",
["absolute_transformation", "complex_equilibrium"],
)
def test_store_load_transformation_same_process(self, request, fixture):
@pytest.mark.parametrize("store", ["setup", "result"])
def test_store_load_transformation_same_process(self, request, fixture, store):
transformation = request.getfixturevalue(fixture)
self._test_store_load_same_process(
transformation,
"store_setup_tokenizable",
"load_setup_tokenizable",
)
store_func_name = f"store_{store}_tokenizable"
load_func_name = f"load_{store}_tokenizable"
self._test_store_load_same_process(transformation, store_func_name, load_func_name, store)

@pytest.mark.parametrize(
"fixture",
["absolute_transformation", "complex_equilibrium"],
)
def test_store_load_transformation_different_process(self, request, fixture):
@pytest.mark.parametrize("store", ["setup", "result"])
def test_store_load_transformation_different_process(self, request, fixture, store):
transformation = request.getfixturevalue(fixture)
store_func_name = f"store_{store}_tokenizable"
load_func_name = f"load_{store}_tokenizable"
self._test_store_load_different_process(
transformation,
"store_setup_tokenizable",
"load_setup_tokenizable",
transformation, store_func_name, load_func_name, store
)

#
@pytest.mark.parametrize("fixture", ["benzene_variants_star_map"])
def test_store_load_network_same_process(self, request, fixture):
@pytest.mark.parametrize("store", ["setup", "result"])
def test_store_load_network_same_process(self, request, fixture, store):
network = request.getfixturevalue(fixture)
assert isinstance(network, GufeTokenizable)
self._test_store_load_same_process(
network, "store_setup_tokenizable", "load_setup_tokenizable"
)
store_func_name = f"store_{store}_tokenizable"
load_func_name = f"load_{store}_tokenizable"
self._test_store_load_same_process(network, store_func_name, load_func_name, store)

#
@pytest.mark.parametrize("fixture", ["benzene_variants_star_map"])
def test_store_load_network_different_process(self, request, fixture):
@pytest.mark.parametrize("store", ["setup", "result"])
def test_store_load_network_different_process(self, request, fixture, store):
network = request.getfixturevalue(fixture)
self._test_store_load_different_process(
network, "store_setup_tokenizable", "load_setup_tokenizable"
)
assert isinstance(network, GufeTokenizable)
store_func_name = f"store_{store}_tokenizable"
load_func_name = f"load_{store}_tokenizable"
self._test_store_load_different_process(network, store_func_name, load_func_name, store)

#
@pytest.mark.parametrize("fixture", ["benzene_variants_star_map"])
def test_delete(self, request, fixture):
store = MemoryStorage()
stores = WarehouseStores(setup=store)
client = WarehouseBaseClass(stores)

@pytest.mark.parametrize("store", ["setup", "result"])
def test_delete(self, request, fixture, store):
network = request.getfixturevalue(fixture)
assert store._data == {}
client.store_setup_tokenizable(network)
assert store._data != {}
key = network.key
loaded = client.load_setup_tokenizable(key)
assert loaded is network
assert client.setup_store.exists(key)
client.delete("setup", key)
assert not client.exists(key)
store_func_name = f"store_{store}_tokenizable"
load_func_name = f"load_{store}_tokenizable"
obj, client = self._test_store_load_same_process(
network, store_func_name, load_func_name, store
)
client.delete(store, obj.key)
assert not client.exists(obj.key)


class TestFileSystemWarehouse:
Expand Down
Loading