diff --git a/diracx-client/src/diracx/client/_generated/_client.py b/diracx-client/src/diracx/client/_generated/_client.py index caf48034f..c5e641f24 100644 --- a/diracx-client/src/diracx/client/_generated/_client.py +++ b/diracx-client/src/diracx/client/_generated/_client.py @@ -15,7 +15,7 @@ from . import models as _models from ._configuration import DiracConfiguration from ._utils.serialization import Deserializer, Serializer -from .operations import AuthOperations, ConfigOperations, JobsOperations, WellKnownOperations +from .operations import AuthOperations, ConfigOperations, JobsOperations, RssOperations, WellKnownOperations class Dirac: # pylint: disable=client-accepts-api-version-keyword @@ -29,6 +29,8 @@ class Dirac: # pylint: disable=client-accepts-api-version-keyword :vartype config: _generated.operations.ConfigOperations :ivar jobs: JobsOperations operations :vartype jobs: _generated.operations.JobsOperations + :ivar rss: RssOperations operations + :vartype rss: _generated.operations.RssOperations :keyword endpoint: Service URL. Required. Default value is "". :paramtype endpoint: str """ @@ -65,6 +67,7 @@ def __init__( # pylint: disable=missing-client-constructor-parameter-credential self.auth = AuthOperations(self._client, self._config, self._serialize, self._deserialize) self.config = ConfigOperations(self._client, self._config, self._serialize, self._deserialize) self.jobs = JobsOperations(self._client, self._config, self._serialize, self._deserialize) + self.rss = RssOperations(self._client, self._config, self._serialize, self._deserialize) def send_request(self, request: HttpRequest, *, stream: bool = False, **kwargs: Any) -> HttpResponse: """Runs the network request through the client's chained policies. diff --git a/diracx-client/src/diracx/client/_generated/aio/_client.py b/diracx-client/src/diracx/client/_generated/aio/_client.py index 79ab383a9..f2d2b46bf 100644 --- a/diracx-client/src/diracx/client/_generated/aio/_client.py +++ b/diracx-client/src/diracx/client/_generated/aio/_client.py @@ -15,7 +15,7 @@ from .. import models as _models from .._utils.serialization import Deserializer, Serializer from ._configuration import DiracConfiguration -from .operations import AuthOperations, ConfigOperations, JobsOperations, WellKnownOperations +from .operations import AuthOperations, ConfigOperations, JobsOperations, RssOperations, WellKnownOperations class Dirac: # pylint: disable=client-accepts-api-version-keyword @@ -29,6 +29,8 @@ class Dirac: # pylint: disable=client-accepts-api-version-keyword :vartype config: _generated.aio.operations.ConfigOperations :ivar jobs: JobsOperations operations :vartype jobs: _generated.aio.operations.JobsOperations + :ivar rss: RssOperations operations + :vartype rss: _generated.aio.operations.RssOperations :keyword endpoint: Service URL. Required. Default value is "". :paramtype endpoint: str """ @@ -65,6 +67,7 @@ def __init__( # pylint: disable=missing-client-constructor-parameter-credential self.auth = AuthOperations(self._client, self._config, self._serialize, self._deserialize) self.config = ConfigOperations(self._client, self._config, self._serialize, self._deserialize) self.jobs = JobsOperations(self._client, self._config, self._serialize, self._deserialize) + self.rss = RssOperations(self._client, self._config, self._serialize, self._deserialize) def send_request( self, request: HttpRequest, *, stream: bool = False, **kwargs: Any diff --git a/diracx-client/src/diracx/client/_generated/aio/operations/__init__.py b/diracx-client/src/diracx/client/_generated/aio/operations/__init__.py index 6be34fb8a..77674abec 100644 --- a/diracx-client/src/diracx/client/_generated/aio/operations/__init__.py +++ b/diracx-client/src/diracx/client/_generated/aio/operations/__init__.py @@ -14,6 +14,7 @@ from ._operations import AuthOperations # type: ignore from ._operations import ConfigOperations # type: ignore from ._operations import JobsOperations # type: ignore +from ._operations import RssOperations # type: ignore from ._patch import __all__ as _patch_all from ._patch import * @@ -24,6 +25,7 @@ "AuthOperations", "ConfigOperations", "JobsOperations", + "RssOperations", ] __all__.extend([p for p in _patch_all if p not in __all__]) # pyright: ignore _patch_sdk() diff --git a/diracx-client/src/diracx/client/_generated/aio/operations/_operations.py b/diracx-client/src/diracx/client/_generated/aio/operations/_operations.py index 8aee57b46..4efe2dd6f 100644 --- a/diracx-client/src/diracx/client/_generated/aio/operations/_operations.py +++ b/diracx-client/src/diracx/client/_generated/aio/operations/_operations.py @@ -51,6 +51,10 @@ build_jobs_summary_request, build_jobs_unassign_bulk_jobs_sandboxes_request, build_jobs_unassign_job_sandboxes_request, + build_rss_get_compute_status_request, + build_rss_get_fts_status_request, + build_rss_get_site_status_request, + build_rss_get_storage_status_request, build_well_known_get_installation_metadata_request, build_well_known_get_jwks_request, build_well_known_get_openid_configuration_request, @@ -2319,3 +2323,303 @@ async def submit_jdl_jobs(self, body: Union[list[str], IO[bytes]], **kwargs: Any return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore + + +class RssOperations: + """ + .. warning:: + **DO NOT** instantiate this class directly. + + Instead, you should access the following operations through + :class:`~_generated.aio.Dirac`'s + :attr:`rss` attribute. + """ + + models = _models + + def __init__(self, *args, **kwargs) -> None: + input_args = list(args) + self._client: AsyncPipelineClient = input_args.pop(0) if input_args else kwargs.pop("client") + self._config: DiracConfiguration = input_args.pop(0) if input_args else kwargs.pop("config") + self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer") + self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer") + + @distributed_trace_async + async def get_storage_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.StorageElementStatus]: + """Get Storage Status. + + Get the latest status of storage elements, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to StorageElementStatus + :rtype: dict[str, ~_generated.models.StorageElementStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.StorageElementStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_storage_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{StorageElementStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_compute_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.ComputeElementStatus]: + """Get Compute Status. + + Get the latest status of compute elements, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to ComputeElementStatus + :rtype: dict[str, ~_generated.models.ComputeElementStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.ComputeElementStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_compute_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{ComputeElementStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_site_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.SiteStatus]: + """Get Site Status. + + Get the latest status of sites, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to SiteStatus + :rtype: dict[str, ~_generated.models.SiteStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.SiteStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_site_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{SiteStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_fts_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.FTSStatus]: + """Get Fts Status. + + Get the latest status of FTS servers, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to FTSStatus + :rtype: dict[str, ~_generated.models.FTSStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.FTSStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_fts_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{FTSStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore diff --git a/diracx-client/src/diracx/client/_generated/models/__init__.py b/diracx-client/src/diracx/client/_generated/models/__init__.py index 14b5195d4..b4c06cc69 100644 --- a/diracx-client/src/diracx/client/_generated/models/__init__.py +++ b/diracx-client/src/diracx/client/_generated/models/__init__.py @@ -12,10 +12,16 @@ from ._models import ( # type: ignore + AllowedStatus, + BannedStatus, BodyAuthGetOidcToken, BodyAuthRevokeRefreshTokenByRefreshToken, BodyJobsRescheduleJobs, BodyJobsUnassignBulkJobsSandboxes, + ComputeElementStatus, + ComputeElementStatusAll, + FTSStatus, + FTSStatusAll, GroupInfo, HTTPValidationError, HeartbeatData, @@ -34,7 +40,14 @@ SearchParamsSearchItem, SetJobStatusReturn, SetJobStatusReturnSuccess, + SiteStatus, + SiteStatusAll, SortSpec, + StorageElementStatus, + StorageElementStatusCheck, + StorageElementStatusRead, + StorageElementStatusRemove, + StorageElementStatusWrite, SummaryParams, SummaryParamsSearchItem, SupportInfo, @@ -59,10 +72,16 @@ from ._patch import patch_sdk as _patch_sdk __all__ = [ + "AllowedStatus", + "BannedStatus", "BodyAuthGetOidcToken", "BodyAuthRevokeRefreshTokenByRefreshToken", "BodyJobsRescheduleJobs", "BodyJobsUnassignBulkJobsSandboxes", + "ComputeElementStatus", + "ComputeElementStatusAll", + "FTSStatus", + "FTSStatusAll", "GroupInfo", "HTTPValidationError", "HeartbeatData", @@ -81,7 +100,14 @@ "SearchParamsSearchItem", "SetJobStatusReturn", "SetJobStatusReturnSuccess", + "SiteStatus", + "SiteStatusAll", "SortSpec", + "StorageElementStatus", + "StorageElementStatusCheck", + "StorageElementStatusRead", + "StorageElementStatusRemove", + "StorageElementStatusWrite", "SummaryParams", "SummaryParamsSearchItem", "SupportInfo", diff --git a/diracx-client/src/diracx/client/_generated/models/_models.py b/diracx-client/src/diracx/client/_generated/models/_models.py index 888ec3b8a..730f15d6d 100644 --- a/diracx-client/src/diracx/client/_generated/models/_models.py +++ b/diracx-client/src/diracx/client/_generated/models/_models.py @@ -16,6 +16,70 @@ JSON = MutableMapping[str, Any] +class AllowedStatus(_serialization.Model): + """AllowedStatus. + + All required parameters must be populated in order to send to server. + + :ivar allowed: Allowed. Required. + :vartype allowed: bool + :ivar warnings: Warnings. + :vartype warnings: str + """ + + _validation = { + "allowed": {"required": True}, + } + + _attribute_map = { + "allowed": {"key": "allowed", "type": "bool"}, + "warnings": {"key": "warnings", "type": "str"}, + } + + def __init__(self, *, allowed: bool, warnings: Optional[str] = None, **kwargs: Any) -> None: + """ + :keyword allowed: Allowed. Required. + :paramtype allowed: bool + :keyword warnings: Warnings. + :paramtype warnings: str + """ + super().__init__(**kwargs) + self.allowed = allowed + self.warnings = warnings + + +class BannedStatus(_serialization.Model): + """BannedStatus. + + All required parameters must be populated in order to send to server. + + :ivar allowed: Allowed. Required. + :vartype allowed: bool + :ivar reason: Reason. + :vartype reason: str + """ + + _validation = { + "allowed": {"required": True}, + } + + _attribute_map = { + "allowed": {"key": "allowed", "type": "bool"}, + "reason": {"key": "reason", "type": "str"}, + } + + def __init__(self, *, allowed: bool, reason: str = "Unknown", **kwargs: Any) -> None: + """ + :keyword allowed: Allowed. Required. + :paramtype allowed: bool + :keyword reason: Reason. + :paramtype reason: str + """ + super().__init__(**kwargs) + self.allowed = allowed + self.reason = reason + + class BodyAuthGetOidcToken(_serialization.Model): """Body_auth_get_oidc_token. @@ -184,6 +248,66 @@ def __init__(self, *, job_ids: list[int], **kwargs: Any) -> None: self.job_ids = job_ids +class ComputeElementStatus(_serialization.Model): + """ComputeElementStatus. + + All required parameters must be populated in order to send to server. + + :ivar all: All. Required. + :vartype all: ~_generated.models.ComputeElementStatusAll + """ + + _validation = { + "all": {"required": True}, + } + + _attribute_map = { + "all": {"key": "all", "type": "ComputeElementStatusAll"}, + } + + def __init__(self, *, all: "_models.ComputeElementStatusAll", **kwargs: Any) -> None: + """ + :keyword all: All. Required. + :paramtype all: ~_generated.models.ComputeElementStatusAll + """ + super().__init__(**kwargs) + self.all = all + + +class ComputeElementStatusAll(_serialization.Model): + """All.""" + + +class FTSStatus(_serialization.Model): + """FTSStatus. + + All required parameters must be populated in order to send to server. + + :ivar all: All. Required. + :vartype all: ~_generated.models.FTSStatusAll + """ + + _validation = { + "all": {"required": True}, + } + + _attribute_map = { + "all": {"key": "all", "type": "FTSStatusAll"}, + } + + def __init__(self, *, all: "_models.FTSStatusAll", **kwargs: Any) -> None: + """ + :keyword all: All. Required. + :paramtype all: ~_generated.models.FTSStatusAll + """ + super().__init__(**kwargs) + self.all = all + + +class FTSStatusAll(_serialization.Model): + """All.""" + + class GroupInfo(_serialization.Model): """GroupInfo. @@ -1261,6 +1385,36 @@ def __init__( self.last_update_time = last_update_time +class SiteStatus(_serialization.Model): + """SiteStatus. + + All required parameters must be populated in order to send to server. + + :ivar all: All. Required. + :vartype all: ~_generated.models.SiteStatusAll + """ + + _validation = { + "all": {"required": True}, + } + + _attribute_map = { + "all": {"key": "all", "type": "SiteStatusAll"}, + } + + def __init__(self, *, all: "_models.SiteStatusAll", **kwargs: Any) -> None: + """ + :keyword all: All. Required. + :paramtype all: ~_generated.models.SiteStatusAll + """ + super().__init__(**kwargs) + self.all = all + + +class SiteStatusAll(_serialization.Model): + """All.""" + + class SortSpec(_serialization.Model): """SortSpec. @@ -1294,6 +1448,77 @@ def __init__(self, *, parameter: str, direction: Union[str, "_models.SortDirecti self.direction = direction +class StorageElementStatus(_serialization.Model): + """StorageElementStatus. + + All required parameters must be populated in order to send to server. + + :ivar read: Read. Required. + :vartype read: ~_generated.models.StorageElementStatusRead + :ivar write: Write. Required. + :vartype write: ~_generated.models.StorageElementStatusWrite + :ivar check: Check. Required. + :vartype check: ~_generated.models.StorageElementStatusCheck + :ivar remove: Remove. Required. + :vartype remove: ~_generated.models.StorageElementStatusRemove + """ + + _validation = { + "read": {"required": True}, + "write": {"required": True}, + "check": {"required": True}, + "remove": {"required": True}, + } + + _attribute_map = { + "read": {"key": "read", "type": "StorageElementStatusRead"}, + "write": {"key": "write", "type": "StorageElementStatusWrite"}, + "check": {"key": "check", "type": "StorageElementStatusCheck"}, + "remove": {"key": "remove", "type": "StorageElementStatusRemove"}, + } + + def __init__( + self, + *, + read: "_models.StorageElementStatusRead", + write: "_models.StorageElementStatusWrite", + check: "_models.StorageElementStatusCheck", + remove: "_models.StorageElementStatusRemove", + **kwargs: Any + ) -> None: + """ + :keyword read: Read. Required. + :paramtype read: ~_generated.models.StorageElementStatusRead + :keyword write: Write. Required. + :paramtype write: ~_generated.models.StorageElementStatusWrite + :keyword check: Check. Required. + :paramtype check: ~_generated.models.StorageElementStatusCheck + :keyword remove: Remove. Required. + :paramtype remove: ~_generated.models.StorageElementStatusRemove + """ + super().__init__(**kwargs) + self.read = read + self.write = write + self.check = check + self.remove = remove + + +class StorageElementStatusCheck(_serialization.Model): + """Check.""" + + +class StorageElementStatusRead(_serialization.Model): + """Read.""" + + +class StorageElementStatusRemove(_serialization.Model): + """Remove.""" + + +class StorageElementStatusWrite(_serialization.Model): + """Write.""" + + class SummaryParams(_serialization.Model): """SummaryParams. diff --git a/diracx-client/src/diracx/client/_generated/operations/__init__.py b/diracx-client/src/diracx/client/_generated/operations/__init__.py index 6be34fb8a..77674abec 100644 --- a/diracx-client/src/diracx/client/_generated/operations/__init__.py +++ b/diracx-client/src/diracx/client/_generated/operations/__init__.py @@ -14,6 +14,7 @@ from ._operations import AuthOperations # type: ignore from ._operations import ConfigOperations # type: ignore from ._operations import JobsOperations # type: ignore +from ._operations import RssOperations # type: ignore from ._patch import __all__ as _patch_all from ._patch import * @@ -24,6 +25,7 @@ "AuthOperations", "ConfigOperations", "JobsOperations", + "RssOperations", ] __all__.extend([p for p in _patch_all if p not in __all__]) # pyright: ignore _patch_sdk() diff --git a/diracx-client/src/diracx/client/_generated/operations/_operations.py b/diracx-client/src/diracx/client/_generated/operations/_operations.py index 11ffdcff7..69089682a 100644 --- a/diracx-client/src/diracx/client/_generated/operations/_operations.py +++ b/diracx-client/src/diracx/client/_generated/operations/_operations.py @@ -565,6 +565,118 @@ def build_jobs_submit_jdl_jobs_request(**kwargs: Any) -> HttpRequest: return HttpRequest(method="POST", url=_url, headers=_headers, **kwargs) +def build_rss_get_storage_status_request( + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/rss/storage" + + # Construct headers + if if_modified_since is not None: + _headers["if-modified-since"] = _SERIALIZER.header("if_modified_since", if_modified_since, "str") + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + if_match = prep_if_match(etag, match_condition) + if if_match is not None: + _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str") + if_none_match = prep_if_none_match(etag, match_condition) + if if_none_match is not None: + _headers["If-None-Match"] = _SERIALIZER.header("if_none_match", if_none_match, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_rss_get_compute_status_request( + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/rss/compute" + + # Construct headers + if if_modified_since is not None: + _headers["if-modified-since"] = _SERIALIZER.header("if_modified_since", if_modified_since, "str") + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + if_match = prep_if_match(etag, match_condition) + if if_match is not None: + _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str") + if_none_match = prep_if_none_match(etag, match_condition) + if if_none_match is not None: + _headers["If-None-Match"] = _SERIALIZER.header("if_none_match", if_none_match, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_rss_get_site_status_request( + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/rss/site" + + # Construct headers + if if_modified_since is not None: + _headers["if-modified-since"] = _SERIALIZER.header("if_modified_since", if_modified_since, "str") + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + if_match = prep_if_match(etag, match_condition) + if if_match is not None: + _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str") + if_none_match = prep_if_none_match(etag, match_condition) + if if_none_match is not None: + _headers["If-None-Match"] = _SERIALIZER.header("if_none_match", if_none_match, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_rss_get_fts_status_request( + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/rss/fts" + + # Construct headers + if if_modified_since is not None: + _headers["if-modified-since"] = _SERIALIZER.header("if_modified_since", if_modified_since, "str") + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + if_match = prep_if_match(etag, match_condition) + if if_match is not None: + _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str") + if_none_match = prep_if_none_match(etag, match_condition) + if if_none_match is not None: + _headers["If-None-Match"] = _SERIALIZER.header("if_none_match", if_none_match, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + class WellKnownOperations: """ .. warning:: @@ -2818,3 +2930,303 @@ def submit_jdl_jobs(self, body: Union[list[str], IO[bytes]], **kwargs: Any) -> l return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore + + +class RssOperations: + """ + .. warning:: + **DO NOT** instantiate this class directly. + + Instead, you should access the following operations through + :class:`~_generated.Dirac`'s + :attr:`rss` attribute. + """ + + models = _models + + def __init__(self, *args, **kwargs) -> None: + input_args = list(args) + self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client") + self._config: DiracConfiguration = input_args.pop(0) if input_args else kwargs.pop("config") + self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer") + self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer") + + @distributed_trace + def get_storage_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.StorageElementStatus]: + """Get Storage Status. + + Get the latest status of storage elements, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to StorageElementStatus + :rtype: dict[str, ~_generated.models.StorageElementStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.StorageElementStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_storage_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{StorageElementStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_compute_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.ComputeElementStatus]: + """Get Compute Status. + + Get the latest status of compute elements, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to ComputeElementStatus + :rtype: dict[str, ~_generated.models.ComputeElementStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.ComputeElementStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_compute_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{ComputeElementStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_site_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.SiteStatus]: + """Get Site Status. + + Get the latest status of sites, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to SiteStatus + :rtype: dict[str, ~_generated.models.SiteStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.SiteStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_site_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{SiteStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_fts_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.FTSStatus]: + """Get Fts Status. + + Get the latest status of FTS servers, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to FTSStatus + :rtype: dict[str, ~_generated.models.FTSStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.FTSStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_fts_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{FTSStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore diff --git a/diracx-core/src/diracx/core/config/__init__.py b/diracx-core/src/diracx/core/config/__init__.py index 35d5fa4e9..15c0c4970 100644 --- a/diracx-core/src/diracx/core/config/__init__.py +++ b/diracx-core/src/diracx/core/config/__init__.py @@ -3,6 +3,8 @@ from __future__ import annotations __all__ = [ + "AsyncCacheableSource", + "CacheableSource", "Config", "ConfigSource", "ConfigSourceUrl", @@ -31,6 +33,8 @@ UserConfig, ) from .sources import ( + AsyncCacheableSource, + CacheableSource, ConfigSource, ConfigSourceUrl, LocalGitConfigSource, diff --git a/diracx-core/src/diracx/core/config/sources.py b/diracx-core/src/diracx/core/config/sources.py index f16fffa82..8072cb1ef 100644 --- a/diracx-core/src/diracx/core/config/sources.py +++ b/diracx-core/src/diracx/core/config/sources.py @@ -22,7 +22,7 @@ from diracx.core.exceptions import BadConfigurationVersionError from diracx.core.extensions import DiracEntryPoint, select_from_extension -from diracx.core.utils import TwoLevelCache +from diracx.core.utils import AsyncTwoLevelCache, TwoLevelCache from .schema import Config @@ -139,6 +139,55 @@ def clear_caches(self): self._content_cache.clear() +class AsyncCacheableSource(Generic[T], metaclass=ABCMeta): + """Abstract base class for async sources that can be cached. + + Async equivalent of CacheableSource. Uses AsyncTwoLevelCache so populate + functions are native coroutines. + """ + + def __init__(self): + self._revision_cache = AsyncTwoLevelCache( + soft_ttl=DEFAULT_CS_REV_CACHE_SOFT_TTL, + hard_ttl=DEFAULT_CS_REV_CACHE_HARD_TTL, + max_items=1, + ) + self._content_cache: Cache = LRUCache(maxsize=2) + + @abstractmethod + async def latest_revision(self) -> tuple[str, datetime]: + """Return (revision_str, modified) identifying the current revision.""" + + @abstractmethod + async def read_raw(self, hexsha: str, modified: datetime) -> T: + """Fetch and return the data for the given revision.""" + + async def _read_work(self) -> str: + hexsha, modified = await self.latest_revision() + if hexsha not in self._content_cache: + self._content_cache[hexsha] = await self.read_raw(hexsha, modified) + return hexsha + + async def read(self) -> T: + """Blocking read — awaits refresh on a hard cache miss.""" + hexsha = await self._revision_cache.get( + "latest_revision", self._read_work, blocking=True + ) + return self._content_cache[hexsha] + + async def read_non_blocking(self) -> T: + """Non-blocking read — raises NotReadyError on a hard cache miss.""" + hexsha = await self._revision_cache.get( + "latest_revision", self._read_work, blocking=False + ) + return self._content_cache[hexsha] + + async def clear_caches(self): + """Clear the caches.""" + await self._revision_cache.clear() + self._content_cache.clear() + + class ConfigSource(CacheableSource[Config]): """Abstract class for the configuration source. diff --git a/diracx-core/src/diracx/core/models/rss.py b/diracx-core/src/diracx/core/models/rss.py index 6f022b38f..10e1444a9 100644 --- a/diracx-core/src/diracx/core/models/rss.py +++ b/diracx-core/src/diracx/core/models/rss.py @@ -1,10 +1,23 @@ from __future__ import annotations +from dataclasses import dataclass +from datetime import datetime from enum import StrEnum -from typing import Annotated, Literal, Union +from typing import Annotated, Generic, Literal, TypeVar, Union from pydantic import BaseModel, Field +T = TypeVar("T") + + +@dataclass(frozen=True) +class Snapshot(Generic[T]): + """Wraps a cached data payload with its cache metadata.""" + + data: T + hexsha: str + modified: datetime + class AllowedStatus(BaseModel): allowed: Literal[True] diff --git a/diracx-core/src/diracx/core/utils.py b/diracx-core/src/diracx/core/utils.py index 8949d07f3..c1ac4c7a3 100644 --- a/diracx-core/src/diracx/core/utils.py +++ b/diracx-core/src/diracx/core/utils.py @@ -2,6 +2,7 @@ __all__ = [ "EXPIRES_GRACE_SECONDS", + "AsyncTwoLevelCache", "TwoLevelCache", "batched_async", "dotenv_files_from_environment", @@ -11,6 +12,7 @@ "write_credentials", ] +import asyncio import fcntl import json import logging @@ -19,7 +21,7 @@ import stat import threading from collections import defaultdict -from collections.abc import Callable +from collections.abc import Callable, Coroutine from concurrent.futures import Future, ThreadPoolExecutor from datetime import datetime, timedelta, timezone from pathlib import Path @@ -293,6 +295,152 @@ def clear(self): self.locks.clear() +class AsyncTwoLevelCache: + """Async equivalent of TwoLevelCache, for use with async populate functions. + + Mirrors the two-TTL semantics of TwoLevelCache exactly: a soft TTL that + triggers a background refresh while still serving the stale value, and a + hard TTL beyond which a miss either awaits a fresh value (blocking=True) or + raises NotReadyError (blocking=False). + + The key difference from TwoLevelCache is that all coordination uses asyncio + primitives (asyncio.Lock, asyncio.Task) rather than a ThreadPoolExecutor, + so populate_func can be a native coroutine. + + Attributes: + soft_cache (TTLCache): A cache with a shorter TTL for quick access. + hard_cache (TTLCache): A cache with a longer TTL as a fallback. + tasks (dict): In-flight refresh Tasks keyed by cache key. + _lock (asyncio.Lock): Guards task creation to ensure single-flight behaviour. + + Args: + soft_ttl (int): Time-to-live in seconds for the soft cache. + hard_ttl (int): Time-to-live in seconds for the hard cache. + max_items (int): Maximum number of items in each cache tier. + + Example: + >>> cache = AsyncTwoLevelCache(soft_ttl=5, hard_ttl=3600) + >>> async def populate(): + ... return await some_db_query() + >>> value = await cache.get("key", populate) + + """ + + def __init__( + self, + soft_ttl: int, + hard_ttl: int, + *, + max_items: int = 1_000_000, + ): + """Initialize the AsyncTwoLevelCache with specified TTLs.""" + self.soft_cache: Cache = TTLCache(max_items, soft_ttl) + self.hard_cache: Cache = TTLCache(max_items, hard_ttl) + # One Task per key for single-flight refresh deduplication. + self.tasks: dict[str, asyncio.Task] = {} + # A single lock guards task creation across all keys. + # Per-key locks would be cleaner but require careful cleanup; + # contention here is minimal since task creation is very fast. + self._lock = asyncio.Lock() + + async def get( + self, + key: str, + populate_func: Callable[[], Coroutine[Any, Any, T]], + blocking: bool = True, + ) -> T: + """Retrieve a value from the cache, populating it if necessary. + + Checks the soft cache first. On a soft miss, kicks off a background + refresh and returns the stale hard-cache value if one exists. On a hard + miss, either awaits the refresh (blocking=True) or raises NotReadyError + (blocking=False). + + Args: + key (str): The cache key to retrieve or populate. + populate_func: An async callable (coroutine function) that returns + the value to cache. + blocking (bool): If True, wait for the populate_func to complete on + a hard miss. If False, raise NotReadyError instead. + + Returns: + The cached value associated with the key. + + """ + # Fast path: soft cache hit, no locking needed. + if key in self.soft_cache: + return self.soft_cache[key] + + async with self._lock: + # Re-check inside the lock in case another coroutine just populated it. + if key in self.soft_cache: + return self.soft_cache[key] + + # Ensure at most one refresh Task is in flight for this key. + if key not in self.tasks or self.tasks[key].done(): + self.tasks[key] = asyncio.create_task(self._work(key, populate_func)) + task = self.tasks[key] + + if key in self.hard_cache: + # Soft miss but hard hit: serve stale while the refresh runs. + # Pre-fill soft cache so the next request skips the lock entirely. + result = self.hard_cache[key] + self.soft_cache[key] = result + return result + + # Hard miss: no value in either cache yet. + if blocking: + # Await outside the lock so _work can acquire it to write results. + await task + return self.hard_cache[key] + + logger.debug( + "Cache key %r not ready yet, background population in progress", key + ) + raise NotReadyError(f"Cache key {key} is not ready yet.") + + async def _work( + self, key: str, populate_func: Callable[[], Coroutine[Any, Any, T]] + ) -> None: + """Await populate_func and write results into both cache tiers. + + Always removes the task entry so the next soft miss can schedule a fresh + refresh, regardless of whether this attempt succeeded or failed. + + Args: + key (str): The cache key to populate. + populate_func: Async callable that produces the value. + + """ + success = False + result = None + try: + result = await populate_func() + success = True + except Exception: + logger.error( + "Failed to populate cache key %r, will retry on next request", + key, + exc_info=True, + ) + raise + finally: + async with self._lock: + self.tasks.pop(key, None) + if success: + self.hard_cache[key] = result + self.soft_cache[key] = result + + async def clear(self): + """Cancel any in-flight refresh tasks and clear both cache tiers.""" + async with self._lock: + for task in self.tasks.values(): + task.cancel() + self.tasks.clear() + self.soft_cache.clear() + self.hard_cache.clear() + + async def batched_async( iterable: AsyncIterable[T], n: int, *, strict: bool = False ) -> AsyncIterable[tuple[T, ...]]: diff --git a/diracx-db/src/diracx/db/sql/rss/db.py b/diracx-db/src/diracx/db/sql/rss/db.py index a891995f6..7fe095da8 100644 --- a/diracx-db/src/diracx/db/sql/rss/db.py +++ b/diracx-db/src/diracx/db/sql/rss/db.py @@ -1,6 +1,8 @@ from __future__ import annotations -from sqlalchemy import select +from datetime import datetime, timezone + +from sqlalchemy import insert, select from sqlalchemy.engine import Row from diracx.core.exceptions import ResourceNotFoundError @@ -18,37 +20,179 @@ class ResourceStatusDB(BaseSQLDB): metadata = RSSBase.metadata - async def get_site_status(self, name: str, vo: str = "all") -> tuple[str, str]: - stmt = select(SiteStatus.status, SiteStatus.reason).where( - SiteStatus.name == name, - SiteStatus.status_type == "all", - SiteStatus.vo == vo, - ) + async def get_site_statuses(self) -> list[tuple[str, str, str, str]]: + """Return all site statuses across all VOs. + + Returns: + List of (name, status, reason, vo) tuples. + + """ + stmt = select( + SiteStatus.name, + SiteStatus.status, + SiteStatus.reason, + SiteStatus.vo, + ).where(SiteStatus.status_type == "all") result = await self.conn.execute(stmt) - row = result.one_or_none() - if not row: - raise ResourceNotFoundError(name) + rows = result.all() + if not rows: + raise ResourceNotFoundError("Site statuses") - return row.Status, row.Reason + return [(row.Name, row.Status, row.Reason, row.VO) for row in rows] - async def get_resource_status( + async def get_resource_statuses( self, - name: str, status_types: list[str] | None = None, - vo: str = "all", - ) -> dict[str, Row]: + ) -> dict[str, dict[str, Row]]: + """Return resource statuses for the given status types across all VOs. + + Args: + status_types: Status type filter (e.g. ["ReadAccess", "WriteAccess"]). + Defaults to ["all"]. + + Returns: + Nested dict keyed by resource name then status type. + + """ if not status_types: status_types = ["all"] stmt = select( - ResourceStatus.status, ResourceStatus.reason, ResourceStatus.status_type + ResourceStatus.name, + ResourceStatus.status, + ResourceStatus.reason, + ResourceStatus.status_type, + ResourceStatus.vo, ).where( - ResourceStatus.name == name, ResourceStatus.status_type.in_(status_types), - ResourceStatus.vo == vo, ) result = await self.conn.execute(stmt) rows = result.all() if not rows: - raise ResourceNotFoundError(name) - return {row.StatusType: row for row in rows} + raise ResourceNotFoundError("Resource statuses") + + statuses: dict[str, dict[str, Row]] = {} + for row in rows: + if row.Name not in statuses: + statuses[row.Name] = {} + statuses[row.Name][row.StatusType] = row + return statuses + + async def get_resource_status_date( + self, + status_types: list[str] | None = None, + ) -> Row[tuple[datetime, datetime]]: + """Return the most recent DateEffective across all VOs for the given status types. + + Args: + status_types: Status type filter. Defaults to ["all"]. + + Returns: + Row with (date_effective, last_check_time) for the most recent entry. + + """ + if not status_types: + status_types = ["all"] + stmt = ( + select( + ResourceStatus.date_effective, + ResourceStatus.last_check_time, + ) + .where(ResourceStatus.status_type.in_(status_types)) + .order_by(ResourceStatus.date_effective.desc()) + .limit(1) + ) + result = await self.conn.execute(stmt) + row = result.first() + if not row: + raise ResourceNotFoundError("Resource statuses") + return row + + async def get_site_status_date(self) -> Row[tuple[datetime, datetime]]: + """Return the most recent DateEffective from the SiteStatus table across all VOs. + + Returns: + Row with (date_effective, last_check_time) for the most recent entry. + + """ + stmt = ( + select( + SiteStatus.date_effective, + SiteStatus.last_check_time, + ) + .where(SiteStatus.status_type == "all") + .order_by(SiteStatus.date_effective.desc()) + .limit(1) + ) + result = await self.conn.execute(stmt) + row = result.first() + if not row: + raise ResourceNotFoundError("Site statuses") + return row + + async def insert_resource_status( + self, + name: str, + status: str, + status_type: str, + vo: str, + reason: str = "", + date_effective: datetime | None = None, + last_check_time: datetime | None = None, + ) -> None: + """Insert a single ResourceStatus row. + + Args: + name: Resource name. + status: Status value. + status_type: One of "all", "ReadAccess", "WriteAccess", etc. + vo: Virtual organisation (e.g. "lhcb", "all"). + reason: Human-readable reason string. + date_effective: Timestamp when the status became effective. + Defaults to now. + last_check_time: Timestamp of last check. Defaults to now. + + """ + now = datetime.now(timezone.utc) + stmt = insert(ResourceStatus).values( + Name=name, + Status=status, + StatusType=status_type, + VO=vo, + Reason=reason, + DateEffective=date_effective or now, + LastCheckTime=last_check_time or now, + ) + await self.conn.execute(stmt) + + async def insert_site_status( + self, + name: str, + status: str, + vo: str, + reason: str = "", + date_effective: datetime | None = None, + last_check_time: datetime | None = None, + ) -> None: + """Insert a single SiteStatus row. + + Args: + name: Site name (e.g. "LCG.CERN.cern"). + status: Status value (e.g. "Active", "Banned"). + vo: Virtual organisation. + reason: Human-readable reason string. + date_effective: Defaults to now. + last_check_time: Defaults to now. + + """ + now = datetime.now(timezone.utc) + stmt = insert(SiteStatus).values( + Name=name, + Status=status, + StatusType="all", + VO=vo, + Reason=reason, + DateEffective=date_effective or now, + LastCheckTime=last_check_time or now, + ) + await self.conn.execute(stmt) diff --git a/diracx-db/tests/rss/test_rss_db.py b/diracx-db/tests/rss/test_rss_db.py index 2956801ee..6e199619f 100644 --- a/diracx-db/tests/rss/test_rss_db.py +++ b/diracx-db/tests/rss/test_rss_db.py @@ -5,7 +5,6 @@ import pytest from sqlalchemy import insert -from diracx.core.exceptions import ResourceNotFoundError from diracx.db.sql.rss.db import ResourceStatusDB _NOW = datetime(2024, 1, 1, tzinfo=timezone.utc) @@ -41,14 +40,13 @@ async def test_site_status(rss_db: ResourceStatusDB): # Test with the test Site (should be found) async with rss_db as db: - status, reason = await db.get_site_status("TestSite") + rows = await db.get_site_statuses() + assert rows + name, status, reason, vo = rows[0] + assert name == "TestSite" assert status == "Active" assert reason == "All good" - - # Test with an unknow Site (should not be found) - with pytest.raises(ResourceNotFoundError): - async with rss_db as db: - await db.get_site_status("Unknown") + assert vo == "all" async def test_resource_status(rss_db: ResourceStatusDB): @@ -102,34 +100,35 @@ async def test_resource_status(rss_db: ResourceStatusDB): # Test with the test Compute Element (should be found) async with rss_db as db: - result = await db.get_resource_status("TestCompute") + result = await db.get_resource_statuses() + assert "TestCompute" in result + result = result["TestCompute"] assert "all" in result assert result["all"].Status == "Active" assert result["all"].Reason == "All good" + assert result["all"].VO == "all" # Test with the test FTS (should be found) async with rss_db as db: - result = await db.get_resource_status("TestFTS") + result = await db.get_resource_statuses() + assert "TestFTS" in result + result = result["TestFTS"] assert "all" in result assert result["all"].Status == "Active" assert result["all"].Reason == "All good" + assert result["all"].VO == "all" # Test with the test Storage Element (should be found) async with rss_db as db: - result = await db.get_resource_status( - "TestStorage", ["ReadAccess", "WriteAccess", "CheckAccess", "RemoveAccess"] + result = await db.get_resource_statuses( + ["ReadAccess", "WriteAccess", "CheckAccess", "RemoveAccess"] ) - assert set(result.keys()) == { + assert set(result["TestStorage"].keys()) == { "ReadAccess", "WriteAccess", "CheckAccess", "RemoveAccess", } - for row in result.values(): + for row in result["TestStorage"].values(): assert row.Status == "Active" assert row.Reason == "All good" - - # Test with an unknow Resource (should not be found) - with pytest.raises(ResourceNotFoundError): - async with rss_db as db: - await db.get_resource_status("Unknown") diff --git a/diracx-logic/src/diracx/logic/rss/query.py b/diracx-logic/src/diracx/logic/rss/query.py index 7ce83cbce..cd6a87d46 100644 --- a/diracx-logic/src/diracx/logic/rss/query.py +++ b/diracx-logic/src/diracx/logic/rss/query.py @@ -35,36 +35,81 @@ def map_status(db_status: str, reason: str | None = None) -> ResourceStatus: ) -async def get_site_status( - resource_status_db: ResourceStatusDB, name: str, vo: str -) -> SiteStatusModel: - status, reason = await resource_status_db.get_site_status(name, vo) - return SiteStatusModel(all=map_status(status, reason)) +async def get_site_statuses( + resource_status_db: ResourceStatusDB, +) -> dict[str, dict[str, SiteStatusModel]]: + """Fetch all site statuses across all VOs. + The returned models carry the vo field so the router can filter to the + caller's VO from the cached all-VO snapshot. + """ + rows = await resource_status_db.get_site_statuses() -async def get_compute_status( - resource_status_db: ResourceStatusDB, name: str, vo: str -) -> ComputeElementStatus: - rows = await resource_status_db.get_resource_status(name, ["all"], vo) - return ComputeElementStatus(all=map_status(rows["all"].Status, rows["all"].Reason)) + result: dict[str, dict[str, SiteStatusModel]] = {} + for name, status, reason, vo in rows: + vo = vo or "all" + if vo not in result: + result[vo] = {} + result[vo][name] = SiteStatusModel(all=map_status(status, reason)) -async def get_fts_status( - resource_status_db: ResourceStatusDB, name: str, vo: str -) -> FTSStatus: - rows = await resource_status_db.get_resource_status(name, ["all"], vo) - return FTSStatus(all=map_status(rows["all"].Status, rows["all"].Reason)) + return result -async def get_storage_status( - resource_status_db: ResourceStatusDB, name: str, vo: str -) -> StorageElementStatus: - rows = await resource_status_db.get_resource_status( - name, ["ReadAccess", "WriteAccess", "CheckAccess", "RemoveAccess"], vo - ) - return StorageElementStatus( - read=map_status(rows["ReadAccess"].Status, rows["ReadAccess"].Reason), - write=map_status(rows["WriteAccess"].Status, rows["WriteAccess"].Reason), - check=map_status(rows["CheckAccess"].Status, rows["CheckAccess"].Reason), - remove=map_status(rows["RemoveAccess"].Status, rows["RemoveAccess"].Reason), +async def get_compute_statuses( + resource_status_db: ResourceStatusDB, +) -> dict[str, dict[str, ComputeElementStatus]]: + """Fetch all compute element statuses across all VOs.""" + all_rows = await resource_status_db.get_resource_statuses(["all"]) + + result: dict[str, dict[str, ComputeElementStatus]] = {} + for name, rows in all_rows.items(): + vo = rows["all"].VO or "all" + if vo not in result: + result[vo] = {} + result[vo][name] = ComputeElementStatus( + all=map_status(rows["all"].Status, rows["all"].Reason) + ) + + return result + + +async def get_fts_statuses( + resource_status_db: ResourceStatusDB, +) -> dict[str, dict[str, FTSStatus]]: + """Fetch all FTS server statuses across all VOs.""" + all_rows = await resource_status_db.get_resource_statuses(["all"]) + + result: dict[str, dict[str, FTSStatus]] = {} + for name, rows in all_rows.items(): + vo = rows["all"].VO or "all" + if vo not in result: + result[vo] = {} + result[vo][name] = FTSStatus( + all=map_status(rows["all"].Status, rows["all"].Reason) + ) + + return result + + +async def get_storage_statuses( + resource_status_db: ResourceStatusDB, +) -> dict[str, dict[str, StorageElementStatus]]: + """Fetch all storage element statuses across all VOs.""" + all_rows = await resource_status_db.get_resource_statuses( + ["ReadAccess", "WriteAccess", "CheckAccess", "RemoveAccess"] ) + + result: dict[str, dict[str, StorageElementStatus]] = {} + for name, rows in all_rows.items(): + vo = rows["ReadAccess"].VO or "all" + if vo not in result: + result[vo] = {} + result[vo][name] = StorageElementStatus( + read=map_status(rows["ReadAccess"].Status, rows["ReadAccess"].Reason), + write=map_status(rows["WriteAccess"].Status, rows["WriteAccess"].Reason), + check=map_status(rows["CheckAccess"].Status, rows["CheckAccess"].Reason), + remove=map_status(rows["RemoveAccess"].Status, rows["RemoveAccess"].Reason), + ) + + return result diff --git a/diracx-logic/src/diracx/logic/rss/source.py b/diracx-logic/src/diracx/logic/rss/source.py new file mode 100644 index 000000000..a00e08d48 --- /dev/null +++ b/diracx-logic/src/diracx/logic/rss/source.py @@ -0,0 +1,98 @@ +"""Resource Status System source classes. + +These classes live in diracx-logic so they can import from diracx-db without +violating the project's dependency flow: + + routers → logic → db → core +""" + +from __future__ import annotations + +import logging +from datetime import datetime +from typing import ClassVar + +from diracx.core.config.sources import AsyncCacheableSource +from diracx.core.models.rss import Snapshot +from diracx.db.sql.rss.db import ResourceStatusDB + +from .query import ( + get_compute_statuses, + get_fts_statuses, + get_site_statuses, + get_storage_statuses, +) + +logger = logging.getLogger(__name__) + + +class ResourceStatusSource(AsyncCacheableSource): + """Base caching source for Compute, Storage and FTS resource types. + + Subclasses declare `resource_type` as a class attribute — latest_revision + and _fetch dispatch on it automatically. + + One source instance per resource type covers all VOs. VO filtering is done + in the route after the snapshot is fetched from the cache. + """ + + resource_type: ClassVar[str] + + def __init__(self, *, db: ResourceStatusDB) -> None: + super().__init__() + self._db = db + + async def latest_revision(self) -> tuple[str, datetime]: + async with self._db as db: + row = await db.get_resource_status_date() + modified: datetime = row.DateEffective + return modified.isoformat(), modified + + async def read_raw(self, hexsha: str, modified: datetime) -> Snapshot: + async with self._db as db: + data = await self._fetch(db) + return Snapshot(data=data, hexsha=hexsha, modified=modified) + + async def _fetch(self, db: ResourceStatusDB) -> dict: + if self.resource_type == "StorageElement": + return await get_storage_statuses(db) + if self.resource_type == "ComputeElement": + return await get_compute_statuses(db) + if self.resource_type == "FTS": + return await get_fts_statuses(db) + raise ValueError(f"Unsupported resource_type: {self.resource_type!r}") + + +class StorageElementStatusSource(ResourceStatusSource): + resource_type = "StorageElement" + + +class ComputeElementStatusSource(ResourceStatusSource): + resource_type = "ComputeElement" + + +class FTSStatusSource(ResourceStatusSource): + resource_type = "FTS" + + +class SiteStatusSource(AsyncCacheableSource): + """Caching source for Site statuses. + + Uses its own DB table (SiteStatus) and a dedicated date query, so it is a + direct subclass of AsyncCacheableSource rather than ResourceStatusSource. + """ + + def __init__(self, *, db: ResourceStatusDB) -> None: + super().__init__() + self._db = db + + async def latest_revision(self) -> tuple[str, datetime]: + async with self._db as db: + row = await db.get_site_status_date() + modified: datetime = row.DateEffective + return modified.isoformat(), modified + + async def read_raw(self, hexsha: str, modified: datetime) -> Snapshot: + async with self._db as db: + data = await get_site_statuses(db) + return Snapshot(data=data, hexsha=hexsha, modified=modified) diff --git a/diracx-logic/tests/rss/test_rss.py b/diracx-logic/tests/rss/test_rss_query.py similarity index 100% rename from diracx-logic/tests/rss/test_rss.py rename to diracx-logic/tests/rss/test_rss_query.py diff --git a/diracx-logic/tests/rss/test_rss_source.py b/diracx-logic/tests/rss/test_rss_source.py new file mode 100644 index 000000000..4ef55df14 --- /dev/null +++ b/diracx-logic/tests/rss/test_rss_source.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +from collections import namedtuple +from datetime import datetime, timezone +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from diracx.core.models.rss import ( + ComputeElementStatus, + FTSStatus, + SiteStatus, + StorageElementStatus, +) +from diracx.db.sql.rss.db import ResourceStatusDB +from diracx.logic.rss.source import ( + ComputeElementStatusSource, + FTSStatusSource, + SiteStatusSource, + StorageElementStatusSource, +) + + +@pytest.fixture +def mock_resource_status_db(): + """Fixture to mock the ResourceStatusDB.""" + db = MagicMock(spec=ResourceStatusDB) + DateRow = namedtuple("DateRow", ["DateEffective", "DateChecked"]) + db.__aenter__ = AsyncMock(return_value=db) + db.__aexit__ = AsyncMock(return_value=None) + db.get_resource_status_date = AsyncMock( + return_value=DateRow( + DateEffective=datetime.fromisoformat("2023-01-01T00:00:00+00:00"), + DateChecked=datetime.now(timezone.utc), + ) + ) + return db + + +async def test_latest_revision(mock_resource_status_db): + """Test the latest_revision method of ResourceStatusSource.""" + source = ComputeElementStatusSource(db=mock_resource_status_db) + + # Call the method + revision, modified = await source.latest_revision() + + # Verify the revision is generated correctly + assert revision + assert isinstance(modified, datetime) + + # Verify the database call + mock_resource_status_db.get_resource_status_date.assert_called_once() + + +async def test_read_raw_site(mock_resource_status_db): + """Test the read_raw method for Site resource type.""" + # Mock the database data + mock_db_data = [("testSite", "Active", "", "test_vo")] + + # Patch the get_site_statuses method of the database to return the mock data + mock_resource_status_db.get_site_statuses = AsyncMock(return_value=mock_db_data) + + # Initialize the ResourceStatusSource with the mocked database + source = SiteStatusSource(db=mock_resource_status_db) + + # Call the read_raw method, which internally calls get_site_statuses from query.py + result = await source.read_raw("test_revision", datetime.now(tz=timezone.utc)) + + # Verify the result matches the expected output + expected_result = {"testSite": SiteStatus(all={"allowed": True, "warnings": None})} + for key, value in expected_result.items(): + assert key in result.data["test_vo"] + assert value.model_dump() == result.data["test_vo"][key].model_dump() + # Verify that the database method was called correctly + mock_resource_status_db.get_site_statuses.assert_awaited_once() + + +async def test_read_raw_compute(mock_resource_status_db): + """Test the read_raw method for ComputeElement resource type.""" + ResourceStatus = namedtuple("ResourceStatus", ["Name", "Status", "Reason", "VO"]) + + mock_db_data = { + "TestCE": { + "all": ResourceStatus( + Name="TestCE", Status="Active", Reason="", VO="test_vo" + ) + } + } + mock_resource_status_db.get_resource_statuses = AsyncMock(return_value=mock_db_data) + + source = ComputeElementStatusSource(db=mock_resource_status_db) + + # Call the method + result = await source.read_raw("test_revision", datetime.now(tz=timezone.utc)) + + # Verify the result + expected_result = { + "TestCE": ComputeElementStatus(all={"allowed": True, "warnings": None}) + } + for key, value in expected_result.items(): + assert key in result.data["test_vo"] + assert value.model_dump() == result.data["test_vo"][key].model_dump() + mock_resource_status_db.get_resource_statuses.assert_awaited_once_with(["all"]) + + +async def test_read_raw_storage(mock_resource_status_db): + """Test the read_raw method for StorageElement resource type.""" + ResourceStatus = namedtuple("ResourceStatus", ["Name", "Status", "Reason", "VO"]) + + mock_db_data = { + "TestSE": { + "ReadAccess": ResourceStatus( + Name="TestSE", Status="Active", Reason=None, VO="test_vo" + ), + "WriteAccess": ResourceStatus( + Name="TestSE", Status="Active", Reason=None, VO="test_vo" + ), + "CheckAccess": ResourceStatus( + Name="TestSE", Status="Active", Reason=None, VO="test_vo" + ), + "RemoveAccess": ResourceStatus( + Name="TestSE", Status="Active", Reason=None, VO="test_vo" + ), + } + } + mock_resource_status_db.get_resource_statuses.return_value = mock_db_data + source = StorageElementStatusSource(db=mock_resource_status_db) + + # Call the method + result = await source.read_raw("test_revision", datetime.now(tz=timezone.utc)) + + # Verify the result + expected_result = { + "TestSE": StorageElementStatus( + read={"allowed": True, "warnings": None}, + write={"allowed": True, "warnings": None}, + check={"allowed": True, "warnings": None}, + remove={"allowed": True, "warnings": None}, + ) + } + for key, value in expected_result.items(): + assert key in result.data["test_vo"] + assert value.model_dump() == result.data["test_vo"][key].model_dump() + mock_resource_status_db.get_resource_statuses.assert_awaited_once_with( + ["ReadAccess", "WriteAccess", "CheckAccess", "RemoveAccess"] + ) + + +async def test_read_raw_fts(mock_resource_status_db): + """Test the read_raw method for FTS resource type.""" + ResourceStatus = namedtuple("ResourceStatus", ["Name", "Status", "Reason", "VO"]) + + mock_db_data = { + "FTS": { + "all": ResourceStatus( + Name="FTS", Status="Active", Reason=None, VO="test_vo" + ), + } + } + mock_resource_status_db.get_resource_statuses.return_value = mock_db_data + + source = FTSStatusSource(db=mock_resource_status_db) + + # Call the method + result = await source.read_raw("test_revision", datetime.now(tz=timezone.utc)) + + # Verify the result + expected_result = {"FTS": FTSStatus(all={"allowed": True, "warnings": None})} + for key, value in expected_result.items(): + assert key in result.data["test_vo"] + assert value.model_dump() == result.data["test_vo"][key].model_dump() + mock_resource_status_db.get_resource_statuses.assert_awaited_once_with(["all"]) diff --git a/diracx-routers/pyproject.toml b/diracx-routers/pyproject.toml index d798b1696..26e49317f 100644 --- a/diracx-routers/pyproject.toml +++ b/diracx-routers/pyproject.toml @@ -47,10 +47,12 @@ auth = "diracx.routers.auth:router" config = "diracx.routers.configuration:router" health = "diracx.routers.health:router" jobs = "diracx.routers.jobs:router" +rss = "diracx.routers.rss:router" [project.entry-points."diracx.access_policies"] wms = "diracx.routers.jobs.access_policies:WMSAccessPolicy" sandbox = "diracx.routers.jobs.access_policies:SandboxAccessPolicy" +rss = "diracx.routers.rss.access_policies:RSSAccessPolicy" # Minimum version of the client supported [project.entry-points."diracx.min_client_version"] @@ -87,5 +89,4 @@ markers = [ "enabled_dependencies: List of dependencies which should be available to the FastAPI test client", ] - asyncio_default_fixture_loop_scope = "function" diff --git a/diracx-routers/src/diracx/routers/rss/__init__.py b/diracx-routers/src/diracx/routers/rss/__init__.py new file mode 100644 index 000000000..1f8b8659e --- /dev/null +++ b/diracx-routers/src/diracx/routers/rss/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +__all__ = ["RSSAccessPolicy", "router"] + +from ..fastapi_classes import DiracxRouter +from .access_policies import RSSAccessPolicy +from .rss import router as rss_router + +router = DiracxRouter() +router.include_router(rss_router) diff --git a/diracx-routers/src/diracx/routers/rss/access_policies.py b/diracx-routers/src/diracx/routers/rss/access_policies.py new file mode 100644 index 000000000..b0b439976 --- /dev/null +++ b/diracx-routers/src/diracx/routers/rss/access_policies.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from collections.abc import Callable +from typing import Annotated + +from fastapi import Depends, HTTPException, status + +from diracx.routers.access_policies import BaseAccessPolicy +from diracx.routers.utils.users import AuthorizedUserInfo + + +class RSSAccessPolicy(BaseAccessPolicy): + """Any authenticated user can access.""" + + @staticmethod + async def policy( + policy_name: str, + user_info: AuthorizedUserInfo, + /, + ): + if user_info.preferred_username: + return + + raise HTTPException(status.HTTP_403_FORBIDDEN) + + +CheckRSSPolicyCallable = Annotated[Callable, Depends(RSSAccessPolicy.check)] diff --git a/diracx-routers/src/diracx/routers/rss/rss.py b/diracx-routers/src/diracx/routers/rss/rss.py new file mode 100644 index 000000000..e0fce2735 --- /dev/null +++ b/diracx-routers/src/diracx/routers/rss/rss.py @@ -0,0 +1,178 @@ +from __future__ import annotations + +import logging +from datetime import datetime, timezone +from typing import Annotated, cast + +from fastapi import ( + Depends, + Header, + HTTPException, + Response, + status, +) + +from diracx.core.models.rss import ( + ComputeElementStatus, + FTSStatus, + SiteStatus, + Snapshot, + StorageElementStatus, +) +from diracx.db.sql.rss.db import ResourceStatusDB +from diracx.logic.rss.source import ( + ComputeElementStatusSource, + FTSStatusSource, + ResourceStatusSource, + SiteStatusSource, + StorageElementStatusSource, +) +from diracx.routers.utils.users import AuthorizedUserInfo, verify_dirac_access_token +from diracx.tasks.plumbing.depends import NoTransaction + +from ..fastapi_classes import DiracxRouter +from .access_policies import CheckRSSPolicyCallable + +logger = logging.getLogger(__name__) + +LAST_MODIFIED_FORMAT = "%a, %d %b %Y %H:%M:%S GMT" + +router = DiracxRouter() + + +async def get_snapshot(rss_type, db) -> Snapshot: + """Get the status snapshot from the unique ResourceStatusSource instance or create it if it does not exist. + + :param rss_type: The type of the resource status source. + :param db: The database instance. + :returns: The status snapshot from the unique ResourceStatusSource instance. + """ + sources: dict[str, ResourceStatusSource | SiteStatusSource] = {} + if rss_type == "ComputeElementStatus": + if rss_type not in sources: + sources[rss_type] = ComputeElementStatusSource(db=db) + return await sources[rss_type].read() + elif rss_type == "StorageElementStatus": + if rss_type not in sources: + sources[rss_type] = StorageElementStatusSource(db=db) + return await sources[rss_type].read() + elif rss_type == "FTSStatus": + if rss_type not in sources: + sources[rss_type] = FTSStatusSource(db=db) + return await sources[rss_type].read() + elif rss_type == "SiteStatus": + if rss_type not in sources: + sources[rss_type] = SiteStatusSource(db=db) + return await sources[rss_type].read() + else: + raise ValueError(f"Unknown resource status source type: {rss_type}") + + +def _apply_cache_headers( + response: Response, + snapshot: Snapshot, + if_none_match: str | None, + if_modified_since: str | None, +) -> None: + """Set ETag / Last-Modified headers and raise 304 when appropriate. + + Raises: + HTTPException(304): when the client's cached copy is still current. + + """ + headers = { + "ETag": snapshot.hexsha, + "Last-Modified": snapshot.modified.strftime(LAST_MODIFIED_FORMAT), + } + + if if_none_match == snapshot.hexsha: + raise HTTPException(status_code=status.HTTP_304_NOT_MODIFIED, headers=headers) + + if if_modified_since: + try: + not_before = datetime.strptime( + if_modified_since, LAST_MODIFIED_FORMAT + ).astimezone(timezone.utc) + except ValueError: + logger.debug( + "Failed to parse If-Modified-Since header: %s", if_modified_since + ) + else: + if not_before > snapshot.modified: + raise HTTPException( + status_code=status.HTTP_304_NOT_MODIFIED, headers=headers + ) + + response.headers.update(headers) + + +@router.get("/storage") +async def get_storage_status( + response: Response, + user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)], + check_permissions: CheckRSSPolicyCallable, + db: Annotated[ResourceStatusDB, NoTransaction()], + if_none_match: Annotated[str | None, Header()] = None, + if_modified_since: Annotated[str | None, Header()] = None, +) -> dict[str, StorageElementStatus]: + """Get the latest status of storage elements, scoped to the caller's VO.""" + snapshot = await get_snapshot("StorageElementStatus", db) + _apply_cache_headers(response, snapshot, if_none_match, if_modified_since) + return cast( + dict[str, StorageElementStatus], + {**snapshot.data.get("all", {}), **snapshot.data.get(user_info.vo, {})}, + ) + + +@router.get("/compute") +async def get_compute_status( + response: Response, + user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)], + check_permissions: CheckRSSPolicyCallable, + db: Annotated[ResourceStatusDB, NoTransaction()], + if_none_match: Annotated[str | None, Header()] = None, + if_modified_since: Annotated[str | None, Header()] = None, +) -> dict[str, ComputeElementStatus]: + """Get the latest status of compute elements, scoped to the caller's VO.""" + snapshot = await get_snapshot("ComputeElementStatus", db) + _apply_cache_headers(response, snapshot, if_none_match, if_modified_since) + return cast( + dict[str, ComputeElementStatus], + {**snapshot.data.get("all", {}), **snapshot.data.get(user_info.vo, {})}, + ) + + +@router.get("/site") +async def get_site_status( + response: Response, + user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)], + check_permissions: CheckRSSPolicyCallable, + db: Annotated[ResourceStatusDB, NoTransaction()], + if_none_match: Annotated[str | None, Header()] = None, + if_modified_since: Annotated[str | None, Header()] = None, +) -> dict[str, SiteStatus]: + """Get the latest status of sites, scoped to the caller's VO.""" + snapshot = await get_snapshot("SiteStatus", db) + _apply_cache_headers(response, snapshot, if_none_match, if_modified_since) + return cast( + dict[str, SiteStatus], + {**snapshot.data.get("all", {}), **snapshot.data.get(user_info.vo, {})}, + ) + + +@router.get("/fts") +async def get_fts_status( + response: Response, + user_info: Annotated[AuthorizedUserInfo, Depends(verify_dirac_access_token)], + check_permissions: CheckRSSPolicyCallable, + db: Annotated[ResourceStatusDB, NoTransaction()], + if_none_match: Annotated[str | None, Header()] = None, + if_modified_since: Annotated[str | None, Header()] = None, +) -> dict[str, FTSStatus]: + """Get the latest status of FTS servers, scoped to the caller's VO.""" + snapshot = await get_snapshot("FTSStatus", db) + _apply_cache_headers(response, snapshot, if_none_match, if_modified_since) + return cast( + dict[str, FTSStatus], + {**snapshot.data.get("all", {}), **snapshot.data.get(user_info.vo, {})}, + ) diff --git a/diracx-routers/tests/test_rss.py b/diracx-routers/tests/test_rss.py new file mode 100644 index 000000000..92056429c --- /dev/null +++ b/diracx-routers/tests/test_rss.py @@ -0,0 +1,188 @@ +from __future__ import annotations + +import asyncio +from datetime import datetime, timezone + +import pytest +from fastapi import status + +pytestmark = pytest.mark.enabled_dependencies( + [ + "AuthSettings", + "ResourceStatusDB", + "SiteStatusSource", + "FTSStatusSource", + "ComputeElementStatusSource", + "StorageElementStatusSource", + "RSSAccessPolicy", + "DevelopmentSettings", + ] +) + + +async def _prepare_rss(client): + """Seed the DB and warm every source cache inside a single connection.""" + from diracx.core.config.sources import AsyncCacheableSource + from diracx.db.sql.rss.db import ResourceStatusDB + + db_override = client.app.dependency_overrides.get(ResourceStatusDB.no_transaction) + if db_override is None: + return + # factory.py stores partial(db_transaction, db_instance); args[0] is the instance. + db = db_override.args[0] + now = datetime.now(tz=timezone.utc) + + # Seed — open one connection, insert all rows, then close it cleanly. + async with db as conn: + for status_type in ("ReadAccess", "WriteAccess", "CheckAccess", "RemoveAccess"): + await conn.insert_resource_status( + name="SE-CERN", + status="Active", + status_type=status_type, + vo="lhcb", + reason="All good", + date_effective=now, + ) + await conn.insert_resource_status( + name="CE-CERN", + status="Active", + status_type="all", + vo="lhcb", + reason="All good", + date_effective=now, + ) + await conn.insert_resource_status( + name="FTS-CERN", + status="Active", + status_type="all", + vo="lhcb", + reason="All good", + date_effective=now, + ) + await conn.insert_site_status( + name="LCG.CERN.cern", + status="Active", + vo="lhcb", + reason="All good", + date_effective=now, + ) + # Connection is now fully closed and _conn ContextVar is reset. + + # Warm each source — each source.read() opens its own fresh connection. + for override in client.app.dependency_overrides.values(): + source = getattr(override, "__self__", None) + if isinstance(source, AsyncCacheableSource): + await source.read() + + +@pytest.fixture +def normal_user_client(client_factory): + with client_factory.normal_user() as client: + asyncio.get_event_loop().run_until_complete(_prepare_rss(client)) + yield client + + +@pytest.fixture +def unauthenticated_client(client_factory): + with client_factory.unauthenticated() as client: + yield client + + +def test_unauthenticated(unauthenticated_client): + response = unauthenticated_client.get("/api/rss/storage") + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + +@pytest.mark.parametrize( + "endpoint", + ["/api/rss/storage", "/api/rss/compute", "/api/rss/site", "/api/rss/fts"], +) +def test_get_resource_status(normal_user_client, endpoint): + r = normal_user_client.get(endpoint) + assert r.status_code == status.HTTP_200_OK, r.json() + assert r.json(), r.text + + last_modified = r.headers["Last-Modified"] + etag = r.headers["ETag"] + + # Matching ETag + matching Last-Modified → 304 + r = normal_user_client.get( + endpoint, + headers={"If-None-Match": etag, "If-Modified-Since": last_modified}, + ) + assert r.status_code == status.HTTP_304_NOT_MODIFIED, r.text + assert not r.text + + # Wrong ETag only → 200 + r = normal_user_client.get( + endpoint, + headers={"If-None-Match": "wrongEtag"}, + ) + assert r.status_code == status.HTTP_200_OK, r.json() + assert r.json(), r.text + + # Past ETag + past timestamp → 200 + r = normal_user_client.get( + endpoint, + headers={ + "If-None-Match": "pastEtag", + "If-Modified-Since": "Mon, 1 Apr 2000 00:42:42 GMT", + }, + ) + assert r.status_code == status.HTTP_200_OK, r.json() + assert r.json(), r.text + + # Wrong ETag + future timestamp → 304 (If-Modified-Since takes effect) + r = normal_user_client.get( + endpoint, + headers={ + "If-None-Match": "futureEtag", + "If-Modified-Since": "Mon, 1 Apr 9999 00:42:42 GMT", + }, + ) + assert r.status_code == status.HTTP_304_NOT_MODIFIED, r.text + assert not r.text + + # Wrong ETag + invalid timestamp → 200 + r = normal_user_client.get( + endpoint, + headers={ + "If-None-Match": "futureEtag", + "If-Modified-Since": "wrong format", + }, + ) + assert r.status_code == status.HTTP_200_OK, r.json() + assert r.json(), r.text + + # Correct ETag + past timestamp → 304 (ETag match takes priority) + r = normal_user_client.get( + endpoint, + headers={ + "If-None-Match": etag, + "If-Modified-Since": "Mon, 1 Apr 2000 00:42:42 GMT", + }, + ) + assert r.status_code == status.HTTP_304_NOT_MODIFIED, r.text + assert not r.text + + # Correct ETag + future timestamp → 304 + r = normal_user_client.get( + endpoint, + headers={ + "If-None-Match": etag, + "If-Modified-Since": "Mon, 1 Apr 9999 00:42:42 GMT", + }, + ) + assert r.status_code == status.HTTP_304_NOT_MODIFIED, r.text + assert not r.text + + # Correct ETag + invalid timestamp → 304 + r = normal_user_client.get( + endpoint, + headers={ + "If-None-Match": etag, + "If-Modified-Since": "wrong format", + }, + ) + assert r.status_code == status.HTTP_304_NOT_MODIFIED, r.text + assert not r.text diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/_client.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/_client.py index b19d47c68..04db73e5f 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/_client.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/_client.py @@ -21,11 +21,12 @@ JobsOperations, LollygagOperations, MyOperations, + RssOperations, WellKnownOperations, ) -class Dirac: # pylint: disable=client-accepts-api-version-keyword +class Dirac: # pylint: disable=client-accepts-api-version-keyword,too-many-instance-attributes """Dirac. :ivar well_known: WellKnownOperations operations @@ -40,6 +41,8 @@ class Dirac: # pylint: disable=client-accepts-api-version-keyword :vartype lollygag: _generated.operations.LollygagOperations :ivar my: MyOperations operations :vartype my: _generated.operations.MyOperations + :ivar rss: RssOperations operations + :vartype rss: _generated.operations.RssOperations :keyword endpoint: Service URL. Required. Default value is "". :paramtype endpoint: str """ @@ -78,6 +81,7 @@ def __init__( # pylint: disable=missing-client-constructor-parameter-credential self.jobs = JobsOperations(self._client, self._config, self._serialize, self._deserialize) self.lollygag = LollygagOperations(self._client, self._config, self._serialize, self._deserialize) self.my = MyOperations(self._client, self._config, self._serialize, self._deserialize) + self.rss = RssOperations(self._client, self._config, self._serialize, self._deserialize) def send_request(self, request: HttpRequest, *, stream: bool = False, **kwargs: Any) -> HttpResponse: """Runs the network request through the client's chained policies. diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/_client.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/_client.py index 32b9dad3a..1adec809d 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/_client.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/_client.py @@ -21,11 +21,12 @@ JobsOperations, LollygagOperations, MyOperations, + RssOperations, WellKnownOperations, ) -class Dirac: # pylint: disable=client-accepts-api-version-keyword +class Dirac: # pylint: disable=client-accepts-api-version-keyword,too-many-instance-attributes """Dirac. :ivar well_known: WellKnownOperations operations @@ -40,6 +41,8 @@ class Dirac: # pylint: disable=client-accepts-api-version-keyword :vartype lollygag: _generated.aio.operations.LollygagOperations :ivar my: MyOperations operations :vartype my: _generated.aio.operations.MyOperations + :ivar rss: RssOperations operations + :vartype rss: _generated.aio.operations.RssOperations :keyword endpoint: Service URL. Required. Default value is "". :paramtype endpoint: str """ @@ -78,6 +81,7 @@ def __init__( # pylint: disable=missing-client-constructor-parameter-credential self.jobs = JobsOperations(self._client, self._config, self._serialize, self._deserialize) self.lollygag = LollygagOperations(self._client, self._config, self._serialize, self._deserialize) self.my = MyOperations(self._client, self._config, self._serialize, self._deserialize) + self.rss = RssOperations(self._client, self._config, self._serialize, self._deserialize) def send_request( self, request: HttpRequest, *, stream: bool = False, **kwargs: Any diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/__init__.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/__init__.py index 5cfdf7253..d7d250107 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/__init__.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/__init__.py @@ -16,6 +16,7 @@ from ._operations import JobsOperations # type: ignore from ._operations import LollygagOperations # type: ignore from ._operations import MyOperations # type: ignore +from ._operations import RssOperations # type: ignore from ._patch import __all__ as _patch_all from ._patch import * @@ -28,6 +29,7 @@ "JobsOperations", "LollygagOperations", "MyOperations", + "RssOperations", ] __all__.extend([p for p in _patch_all if p not in __all__]) # pyright: ignore _patch_sdk() diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/_operations.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/_operations.py index a2e0565c5..fb42de5d0 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/_operations.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/aio/operations/_operations.py @@ -56,6 +56,10 @@ build_lollygag_insert_owner_object_request, build_my_pilots_get_pilot_summary_request, build_my_pilots_submit_pilot_request, + build_rss_get_compute_status_request, + build_rss_get_fts_status_request, + build_rss_get_site_status_request, + build_rss_get_storage_status_request, build_well_known_get_installation_metadata_request, build_well_known_get_jwks_request, build_well_known_get_openid_configuration_request, @@ -2605,3 +2609,303 @@ async def pilots_get_pilot_summary(self, **kwargs: Any) -> dict[str, int]: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore + + +class RssOperations: + """ + .. warning:: + **DO NOT** instantiate this class directly. + + Instead, you should access the following operations through + :class:`~_generated.aio.Dirac`'s + :attr:`rss` attribute. + """ + + models = _models + + def __init__(self, *args, **kwargs) -> None: + input_args = list(args) + self._client: AsyncPipelineClient = input_args.pop(0) if input_args else kwargs.pop("client") + self._config: DiracConfiguration = input_args.pop(0) if input_args else kwargs.pop("config") + self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer") + self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer") + + @distributed_trace_async + async def get_storage_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.StorageElementStatus]: + """Get Storage Status. + + Get the latest status of storage elements, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to StorageElementStatus + :rtype: dict[str, ~_generated.models.StorageElementStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.StorageElementStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_storage_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{StorageElementStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_compute_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.ComputeElementStatus]: + """Get Compute Status. + + Get the latest status of compute elements, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to ComputeElementStatus + :rtype: dict[str, ~_generated.models.ComputeElementStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.ComputeElementStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_compute_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{ComputeElementStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_site_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.SiteStatus]: + """Get Site Status. + + Get the latest status of sites, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to SiteStatus + :rtype: dict[str, ~_generated.models.SiteStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.SiteStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_site_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{SiteStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace_async + async def get_fts_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.FTSStatus]: + """Get Fts Status. + + Get the latest status of FTS servers, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to FTSStatus + :rtype: dict[str, ~_generated.models.FTSStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.FTSStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_fts_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = await self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{FTSStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/__init__.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/__init__.py index b97d2e439..6684af567 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/__init__.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/__init__.py @@ -12,11 +12,17 @@ from ._models import ( # type: ignore + AllowedStatus, + BannedStatus, BodyAuthGetOidcToken, BodyAuthRevokeRefreshTokenByRefreshToken, BodyJobsRescheduleJobs, BodyJobsUnassignBulkJobsSandboxes, + ComputeElementStatus, + ComputeElementStatusAll, ExtendedMetadata, + FTSStatus, + FTSStatusAll, GroupInfo, HTTPValidationError, HeartbeatData, @@ -34,7 +40,14 @@ SearchParamsSearchItem, SetJobStatusReturn, SetJobStatusReturnSuccess, + SiteStatus, + SiteStatusAll, SortSpec, + StorageElementStatus, + StorageElementStatusCheck, + StorageElementStatusRead, + StorageElementStatusRemove, + StorageElementStatusWrite, SummaryParams, SummaryParamsSearchItem, SupportInfo, @@ -59,11 +72,17 @@ from diracx.client._generated.models._patch import patch_sdk as _patch_sdk __all__ = [ + "AllowedStatus", + "BannedStatus", "BodyAuthGetOidcToken", "BodyAuthRevokeRefreshTokenByRefreshToken", "BodyJobsRescheduleJobs", "BodyJobsUnassignBulkJobsSandboxes", + "ComputeElementStatus", + "ComputeElementStatusAll", "ExtendedMetadata", + "FTSStatus", + "FTSStatusAll", "GroupInfo", "HTTPValidationError", "HeartbeatData", @@ -81,7 +100,14 @@ "SearchParamsSearchItem", "SetJobStatusReturn", "SetJobStatusReturnSuccess", + "SiteStatus", + "SiteStatusAll", "SortSpec", + "StorageElementStatus", + "StorageElementStatusCheck", + "StorageElementStatusRead", + "StorageElementStatusRemove", + "StorageElementStatusWrite", "SummaryParams", "SummaryParamsSearchItem", "SupportInfo", diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/_models.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/_models.py index 69b8ffcf1..6953a050f 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/_models.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/models/_models.py @@ -16,6 +16,70 @@ JSON = MutableMapping[str, Any] +class AllowedStatus(_serialization.Model): + """AllowedStatus. + + All required parameters must be populated in order to send to server. + + :ivar allowed: Allowed. Required. + :vartype allowed: bool + :ivar warnings: Warnings. + :vartype warnings: str + """ + + _validation = { + "allowed": {"required": True}, + } + + _attribute_map = { + "allowed": {"key": "allowed", "type": "bool"}, + "warnings": {"key": "warnings", "type": "str"}, + } + + def __init__(self, *, allowed: bool, warnings: Optional[str] = None, **kwargs: Any) -> None: + """ + :keyword allowed: Allowed. Required. + :paramtype allowed: bool + :keyword warnings: Warnings. + :paramtype warnings: str + """ + super().__init__(**kwargs) + self.allowed = allowed + self.warnings = warnings + + +class BannedStatus(_serialization.Model): + """BannedStatus. + + All required parameters must be populated in order to send to server. + + :ivar allowed: Allowed. Required. + :vartype allowed: bool + :ivar reason: Reason. + :vartype reason: str + """ + + _validation = { + "allowed": {"required": True}, + } + + _attribute_map = { + "allowed": {"key": "allowed", "type": "bool"}, + "reason": {"key": "reason", "type": "str"}, + } + + def __init__(self, *, allowed: bool, reason: str = "Unknown", **kwargs: Any) -> None: + """ + :keyword allowed: Allowed. Required. + :paramtype allowed: bool + :keyword reason: Reason. + :paramtype reason: str + """ + super().__init__(**kwargs) + self.allowed = allowed + self.reason = reason + + class BodyAuthGetOidcToken(_serialization.Model): """Body_auth_get_oidc_token. @@ -184,6 +248,36 @@ def __init__(self, *, job_ids: list[int], **kwargs: Any) -> None: self.job_ids = job_ids +class ComputeElementStatus(_serialization.Model): + """ComputeElementStatus. + + All required parameters must be populated in order to send to server. + + :ivar all: All. Required. + :vartype all: ~_generated.models.ComputeElementStatusAll + """ + + _validation = { + "all": {"required": True}, + } + + _attribute_map = { + "all": {"key": "all", "type": "ComputeElementStatusAll"}, + } + + def __init__(self, *, all: "_models.ComputeElementStatusAll", **kwargs: Any) -> None: + """ + :keyword all: All. Required. + :paramtype all: ~_generated.models.ComputeElementStatusAll + """ + super().__init__(**kwargs) + self.all = all + + +class ComputeElementStatusAll(_serialization.Model): + """All.""" + + class ExtendedMetadata(_serialization.Model): """ExtendedMetadata. @@ -231,6 +325,36 @@ def __init__( self.gubbins_user_info = gubbins_user_info +class FTSStatus(_serialization.Model): + """FTSStatus. + + All required parameters must be populated in order to send to server. + + :ivar all: All. Required. + :vartype all: ~_generated.models.FTSStatusAll + """ + + _validation = { + "all": {"required": True}, + } + + _attribute_map = { + "all": {"key": "all", "type": "FTSStatusAll"}, + } + + def __init__(self, *, all: "_models.FTSStatusAll", **kwargs: Any) -> None: + """ + :keyword all: All. Required. + :paramtype all: ~_generated.models.FTSStatusAll + """ + super().__init__(**kwargs) + self.all = all + + +class FTSStatusAll(_serialization.Model): + """All.""" + + class GroupInfo(_serialization.Model): """GroupInfo. @@ -1282,6 +1406,36 @@ def __init__( self.last_update_time = last_update_time +class SiteStatus(_serialization.Model): + """SiteStatus. + + All required parameters must be populated in order to send to server. + + :ivar all: All. Required. + :vartype all: ~_generated.models.SiteStatusAll + """ + + _validation = { + "all": {"required": True}, + } + + _attribute_map = { + "all": {"key": "all", "type": "SiteStatusAll"}, + } + + def __init__(self, *, all: "_models.SiteStatusAll", **kwargs: Any) -> None: + """ + :keyword all: All. Required. + :paramtype all: ~_generated.models.SiteStatusAll + """ + super().__init__(**kwargs) + self.all = all + + +class SiteStatusAll(_serialization.Model): + """All.""" + + class SortSpec(_serialization.Model): """SortSpec. @@ -1315,6 +1469,77 @@ def __init__(self, *, parameter: str, direction: Union[str, "_models.SortDirecti self.direction = direction +class StorageElementStatus(_serialization.Model): + """StorageElementStatus. + + All required parameters must be populated in order to send to server. + + :ivar read: Read. Required. + :vartype read: ~_generated.models.StorageElementStatusRead + :ivar write: Write. Required. + :vartype write: ~_generated.models.StorageElementStatusWrite + :ivar check: Check. Required. + :vartype check: ~_generated.models.StorageElementStatusCheck + :ivar remove: Remove. Required. + :vartype remove: ~_generated.models.StorageElementStatusRemove + """ + + _validation = { + "read": {"required": True}, + "write": {"required": True}, + "check": {"required": True}, + "remove": {"required": True}, + } + + _attribute_map = { + "read": {"key": "read", "type": "StorageElementStatusRead"}, + "write": {"key": "write", "type": "StorageElementStatusWrite"}, + "check": {"key": "check", "type": "StorageElementStatusCheck"}, + "remove": {"key": "remove", "type": "StorageElementStatusRemove"}, + } + + def __init__( + self, + *, + read: "_models.StorageElementStatusRead", + write: "_models.StorageElementStatusWrite", + check: "_models.StorageElementStatusCheck", + remove: "_models.StorageElementStatusRemove", + **kwargs: Any + ) -> None: + """ + :keyword read: Read. Required. + :paramtype read: ~_generated.models.StorageElementStatusRead + :keyword write: Write. Required. + :paramtype write: ~_generated.models.StorageElementStatusWrite + :keyword check: Check. Required. + :paramtype check: ~_generated.models.StorageElementStatusCheck + :keyword remove: Remove. Required. + :paramtype remove: ~_generated.models.StorageElementStatusRemove + """ + super().__init__(**kwargs) + self.read = read + self.write = write + self.check = check + self.remove = remove + + +class StorageElementStatusCheck(_serialization.Model): + """Check.""" + + +class StorageElementStatusRead(_serialization.Model): + """Read.""" + + +class StorageElementStatusRemove(_serialization.Model): + """Remove.""" + + +class StorageElementStatusWrite(_serialization.Model): + """Write.""" + + class SummaryParams(_serialization.Model): """SummaryParams. diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/__init__.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/__init__.py index 5cfdf7253..d7d250107 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/__init__.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/__init__.py @@ -16,6 +16,7 @@ from ._operations import JobsOperations # type: ignore from ._operations import LollygagOperations # type: ignore from ._operations import MyOperations # type: ignore +from ._operations import RssOperations # type: ignore from ._patch import __all__ as _patch_all from ._patch import * @@ -28,6 +29,7 @@ "JobsOperations", "LollygagOperations", "MyOperations", + "RssOperations", ] __all__.extend([p for p in _patch_all if p not in __all__]) # pyright: ignore _patch_sdk() diff --git a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/_operations.py b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/_operations.py index 7dcaa92ee..7866a140a 100644 --- a/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/_operations.py +++ b/extensions/gubbins/gubbins-client/src/gubbins/client/_generated/operations/_operations.py @@ -647,6 +647,118 @@ def build_my_pilots_get_pilot_summary_request(**kwargs: Any) -> HttpRequest: # return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) +def build_rss_get_storage_status_request( + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/rss/storage" + + # Construct headers + if if_modified_since is not None: + _headers["if-modified-since"] = _SERIALIZER.header("if_modified_since", if_modified_since, "str") + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + if_match = prep_if_match(etag, match_condition) + if if_match is not None: + _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str") + if_none_match = prep_if_none_match(etag, match_condition) + if if_none_match is not None: + _headers["If-None-Match"] = _SERIALIZER.header("if_none_match", if_none_match, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_rss_get_compute_status_request( + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/rss/compute" + + # Construct headers + if if_modified_since is not None: + _headers["if-modified-since"] = _SERIALIZER.header("if_modified_since", if_modified_since, "str") + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + if_match = prep_if_match(etag, match_condition) + if if_match is not None: + _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str") + if_none_match = prep_if_none_match(etag, match_condition) + if if_none_match is not None: + _headers["If-None-Match"] = _SERIALIZER.header("if_none_match", if_none_match, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_rss_get_site_status_request( + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/rss/site" + + # Construct headers + if if_modified_since is not None: + _headers["if-modified-since"] = _SERIALIZER.header("if_modified_since", if_modified_since, "str") + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + if_match = prep_if_match(etag, match_condition) + if if_match is not None: + _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str") + if_none_match = prep_if_none_match(etag, match_condition) + if if_none_match is not None: + _headers["If-None-Match"] = _SERIALIZER.header("if_none_match", if_none_match, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + +def build_rss_get_fts_status_request( + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any +) -> HttpRequest: + _headers = case_insensitive_dict(kwargs.pop("headers", {}) or {}) + + accept = _headers.pop("Accept", "application/json") + + # Construct URL + _url = "/api/rss/fts" + + # Construct headers + if if_modified_since is not None: + _headers["if-modified-since"] = _SERIALIZER.header("if_modified_since", if_modified_since, "str") + _headers["Accept"] = _SERIALIZER.header("accept", accept, "str") + if_match = prep_if_match(etag, match_condition) + if if_match is not None: + _headers["If-Match"] = _SERIALIZER.header("if_match", if_match, "str") + if_none_match = prep_if_none_match(etag, match_condition) + if if_none_match is not None: + _headers["If-None-Match"] = _SERIALIZER.header("if_none_match", if_none_match, "str") + + return HttpRequest(method="GET", url=_url, headers=_headers, **kwargs) + + class WellKnownOperations: """ .. warning:: @@ -3181,3 +3293,303 @@ def pilots_get_pilot_summary(self, **kwargs: Any) -> dict[str, int]: return cls(pipeline_response, deserialized, {}) # type: ignore return deserialized # type: ignore + + +class RssOperations: + """ + .. warning:: + **DO NOT** instantiate this class directly. + + Instead, you should access the following operations through + :class:`~_generated.Dirac`'s + :attr:`rss` attribute. + """ + + models = _models + + def __init__(self, *args, **kwargs) -> None: + input_args = list(args) + self._client: PipelineClient = input_args.pop(0) if input_args else kwargs.pop("client") + self._config: DiracConfiguration = input_args.pop(0) if input_args else kwargs.pop("config") + self._serialize: Serializer = input_args.pop(0) if input_args else kwargs.pop("serializer") + self._deserialize: Deserializer = input_args.pop(0) if input_args else kwargs.pop("deserializer") + + @distributed_trace + def get_storage_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.StorageElementStatus]: + """Get Storage Status. + + Get the latest status of storage elements, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to StorageElementStatus + :rtype: dict[str, ~_generated.models.StorageElementStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.StorageElementStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_storage_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{StorageElementStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_compute_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.ComputeElementStatus]: + """Get Compute Status. + + Get the latest status of compute elements, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to ComputeElementStatus + :rtype: dict[str, ~_generated.models.ComputeElementStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.ComputeElementStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_compute_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{ComputeElementStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_site_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.SiteStatus]: + """Get Site Status. + + Get the latest status of sites, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to SiteStatus + :rtype: dict[str, ~_generated.models.SiteStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.SiteStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_site_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{SiteStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore + + @distributed_trace + def get_fts_status( + self, + *, + if_modified_since: Optional[str] = None, + etag: Optional[str] = None, + match_condition: Optional[MatchConditions] = None, + **kwargs: Any + ) -> dict[str, _models.FTSStatus]: + """Get Fts Status. + + Get the latest status of FTS servers, scoped to the caller's VO. + + :keyword if_modified_since: Default value is None. + :paramtype if_modified_since: str + :keyword etag: check if resource is changed. Set None to skip checking etag. Default value is + None. + :paramtype etag: str + :keyword match_condition: The match condition to use upon the etag. Default value is None. + :paramtype match_condition: ~azure.core.MatchConditions + :return: dict mapping str to FTSStatus + :rtype: dict[str, ~_generated.models.FTSStatus] + :raises ~azure.core.exceptions.HttpResponseError: + """ + error_map: MutableMapping = { + 401: ClientAuthenticationError, + 404: ResourceNotFoundError, + 409: ResourceExistsError, + 304: ResourceNotModifiedError, + } + if match_condition == MatchConditions.IfNotModified: + error_map[412] = ResourceModifiedError + elif match_condition == MatchConditions.IfPresent: + error_map[412] = ResourceNotFoundError + elif match_condition == MatchConditions.IfMissing: + error_map[412] = ResourceExistsError + error_map.update(kwargs.pop("error_map", {}) or {}) + + _headers = kwargs.pop("headers", {}) or {} + _params = kwargs.pop("params", {}) or {} + + cls: ClsType[dict[str, _models.FTSStatus]] = kwargs.pop("cls", None) + + _request = build_rss_get_fts_status_request( + if_modified_since=if_modified_since, + etag=etag, + match_condition=match_condition, + headers=_headers, + params=_params, + ) + _request.url = self._client.format_url(_request.url) + + _stream = False + pipeline_response: PipelineResponse = self._client._pipeline.run( # pylint: disable=protected-access + _request, stream=_stream, **kwargs + ) + + response = pipeline_response.http_response + + if response.status_code not in [200]: + map_error(status_code=response.status_code, response=response, error_map=error_map) + raise HttpResponseError(response=response) + + deserialized = self._deserialize("{FTSStatus}", pipeline_response.http_response) + + if cls: + return cls(pipeline_response, deserialized, {}) # type: ignore + + return deserialized # type: ignore