Skip to content

Commit e82dbd3

Browse files
vincewhitekgreav
andauthored
[DEV-3371] Add support for triggering ingestor runs via EAS (#39)
Signed-off-by: vince <vince.white@zepben.com> Signed-off-by: Kurt Greaves <kurt.greaves@zepben.com> Co-authored-by: Kurt Greaves <kurt.greaves@zepben.com>
1 parent 03fa5c5 commit e82dbd3

File tree

4 files changed

+398
-2
lines changed

4 files changed

+398
-2
lines changed

changelog.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
* None.
55

66
### New Features
7-
* None.
7+
* Added basic client method `run_ingestor` to run ingestors via EAS's `executeIngestor` graphql mutation.
8+
* Added basic client methods `get_ingestor_run` and `get_ingestor_run_list` to retrieve the records of previous ingestor runs.
89

910
### Enhancements
1011
* None.

src/zepben/eas/client/eas_client.py

Lines changed: 170 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from http import HTTPStatus
1212
from json import dumps
1313
from typing import Optional, List
14+
from dataclasses import asdict
1415

1516
import aiohttp
1617
from aiohttp import ClientSession
@@ -20,6 +21,7 @@
2021
from zepben.eas.client.feeder_load_analysis_input import FeederLoadAnalysisInput
2122
from zepben.eas.client.opendss import OpenDssConfig, GetOpenDssModelsFilterInput, GetOpenDssModelsSortCriteriaInput
2223
from zepben.eas.client.study import Study
24+
from zepben.eas.client.ingestor import IngestorConfigInput, IngestorRunsFilterInput, IngestorRunsSortCriteriaInput
2325
from zepben.eas.client.util import construct_url
2426
from zepben.eas.client.work_package import WorkPackageConfig, FixedTime, TimePeriod, ForecastConfig, FeederConfigs
2527

@@ -861,6 +863,49 @@ async def async_upload_study(self, study: Study):
861863
}
862864
}
863865
}
866+
if self._verify_certificate:
867+
sslcontext = ssl.create_default_context(cafile=self._ca_filename)
868+
869+
async with self.session.post(
870+
construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"),
871+
headers=self._get_request_headers(),
872+
json=json,
873+
ssl=sslcontext if self._verify_certificate else False
874+
) as response:
875+
if response.ok:
876+
return await response.json()
877+
else:
878+
response.raise_for_status()
879+
880+
def run_ingestor(self, run_config: List[IngestorConfigInput]):
881+
"""
882+
Send request to perform an ingestor run
883+
:param run_config: A list of IngestorConfigInput
884+
:return: The HTTP response received from the Evolve App Server after attempting to run the ingestor
885+
"""
886+
return get_event_loop().run_until_complete(
887+
self.async_run_ingestor(run_config))
888+
889+
async def async_run_ingestor(self, run_config: List[IngestorConfigInput]):
890+
"""
891+
Send asynchronous request to perform an ingestor run
892+
:param run_config: A list of IngestorConfigInput
893+
:return: The HTTP response received from the Evolve App Server after attempting to run the ingestor
894+
"""
895+
with warnings.catch_warnings():
896+
if not self._verify_certificate:
897+
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
898+
json = {
899+
"query": """
900+
mutation executeIngestor($runConfig: [IngestorConfigInput!]) {
901+
executeIngestor(runConfig: $runConfig)
902+
}
903+
""",
904+
"variables": {
905+
"runConfig": [asdict(x) for x in run_config],
906+
}
907+
}
908+
864909
if self._verify_certificate:
865910
sslcontext = ssl.create_default_context(cafile=self._ca_filename)
866911

@@ -875,7 +920,131 @@ async def async_upload_study(self, study: Study):
875920
else:
876921
response.raise_for_status()
877922

878-
def run_hosting_capacity_calibration(self, calibration_name: str, local_calibration_time: datetime, feeders: Optional[List[str]] = None):
923+
def get_ingestor_run(self, ingestor_run_id: int):
924+
"""
925+
Send request to retrieve the record of a particular ingestor run.
926+
:param ingestor_run_id: The ID of the ingestor run to retrieve execution information about.
927+
:return: The HTTP response received from the Evolve App Server including the ingestor run information (if found).
928+
"""
929+
return get_event_loop().run_until_complete(
930+
self.async_get_ingestor_run(ingestor_run_id))
931+
932+
async def async_get_ingestor_run(self, ingestor_run_id: int):
933+
"""
934+
Send asynchronous request to retrieve the record of a particular ingestor run.
935+
:param ingestor_run_id: The ID of the ingestor run to retrieve execution information about.
936+
:return: The HTTP response received from the Evolve App Server including the ingestor run information (if found).
937+
"""
938+
with warnings.catch_warnings():
939+
if not self._verify_certificate:
940+
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
941+
json = {
942+
"query": """
943+
query getIngestorRun($id: Int!) {
944+
getIngestorRun(id: $id) {
945+
id
946+
containerRuntimeType,
947+
payload,
948+
token,
949+
status,
950+
startedAt,
951+
statusLastUpdatedAt,
952+
completedAt
953+
}
954+
}
955+
""",
956+
"variables": {
957+
"id": ingestor_run_id,
958+
}
959+
}
960+
961+
if self._verify_certificate:
962+
sslcontext = ssl.create_default_context(cafile=self._ca_filename)
963+
964+
async with self.session.post(
965+
construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"),
966+
headers=self._get_request_headers(),
967+
json=json,
968+
ssl=sslcontext if self._verify_certificate else False
969+
) as response:
970+
if response.ok:
971+
return await response.json()
972+
else:
973+
raise response.raise_for_status()
974+
975+
def get_ingestor_run_list(self, query_filter: Optional[IngestorRunsFilterInput] = None,
976+
query_sort: Optional[IngestorRunsSortCriteriaInput] = None):
977+
"""
978+
Send request to retrieve a list of ingestor run records matching the provided filter parameters.
979+
:param query_filter: An `IngestorRunsFilterInput` object. Only records matching the provided values will be returned.
980+
If not supplied all records will be returned. (Optional)
981+
:param query_sort: An `IngestorRunsSortCriteriaInput` that can control the order of the returned record based on a number of fields. (Optional)
982+
:return: The HTTP response received from the Evolve App Server including all matching ingestor records found.
983+
"""
984+
return get_event_loop().run_until_complete(
985+
self.async_get_ingestor_run_list(query_filter, query_sort))
986+
987+
async def async_get_ingestor_run_list(self, query_filter: Optional[IngestorRunsFilterInput] = None,
988+
query_sort: Optional[IngestorRunsSortCriteriaInput] = None):
989+
"""
990+
Send asynchronous request to retrieve a list of ingestor run records matching the provided filter parameters.
991+
:param query_filter: An `IngestorRunsFilterInput` object. Only records matching the provided values will be returned.
992+
If not supplied all records will be returned. (Optional)
993+
:param query_sort: An `IngestorRunsSortCriteriaInput` that can control the order of the returned record based on a number of fields. (Optional)
994+
:return: The HTTP response received from the Evolve App Server including all matching ingestor records found.
995+
"""
996+
997+
with warnings.catch_warnings():
998+
if not self._verify_certificate:
999+
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
1000+
json = {
1001+
"query": """
1002+
query listIngestorRuns($filter: IngestorRunsFilterInput, $sort: IngestorRunsSortCriteriaInput) {
1003+
listIngestorRuns(filter: $filter, sort: $sort) {
1004+
id
1005+
containerRuntimeType,
1006+
payload,
1007+
token,
1008+
status,
1009+
startedAt,
1010+
statusLastUpdatedAt,
1011+
completedAt
1012+
}
1013+
}
1014+
""",
1015+
"variables": {
1016+
**({"filter": {
1017+
"id": query_filter.id,
1018+
"status": query_filter.status and [state.name for state in query_filter.status],
1019+
"completed": query_filter.completed,
1020+
"containerRuntimeType": query_filter.container_runtime_type and [runtime.name for runtime in
1021+
query_filter.container_runtime_type]
1022+
}} if query_filter else {}),
1023+
**({"sort": {
1024+
"status": query_sort.status and query_sort.status.name,
1025+
"startedAt": query_sort.started_at and query_sort.started_at.name,
1026+
"statusLastUpdatedAt": query_sort.status_last_updated_at and query_sort.status_last_updated_at.name,
1027+
"completedAt": query_sort.completed_at and query_sort.completed_at.name,
1028+
"containerRuntimeType": query_sort.container_runtime_type and query_sort.container_runtime_type.name,
1029+
}} if query_sort else {})
1030+
}
1031+
}
1032+
1033+
if self._verify_certificate:
1034+
sslcontext = ssl.create_default_context(cafile=self._ca_filename)
1035+
1036+
async with self.session.post(
1037+
construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"),
1038+
headers=self._get_request_headers(),
1039+
json=json,
1040+
ssl=sslcontext if self._verify_certificate else False
1041+
) as response:
1042+
if response.ok:
1043+
return await response.json()
1044+
else:
1045+
raise response.raise_for_status()
1046+
1047+
def run_hosting_capacity_calibration(self, calibration_name: str, local_calibration_time: Optional[str] = None, feeders: Optional[List[str]] = None):
8791048
"""
8801049
Send request to run hosting capacity calibration
8811050
:param calibration_name: A string representation of the calibration name

src/zepben/eas/client/ingestor.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Copyright 2025 Zeppelin Bend Pty Ltd
2+
#
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
from dataclasses import dataclass
7+
from enum import Enum
8+
from typing import Optional, List
9+
from datetime import datetime
10+
11+
__all__ = [
12+
"IngestorConfigInput", "IngestorRuntimeKind", "IngestorRunState", "IngestorRun", "IngestorRunsFilterInput", "Order",
13+
"IngestorRunsSortCriteriaInput"
14+
]
15+
16+
17+
@dataclass
18+
class IngestorConfigInput:
19+
key: str
20+
value: str
21+
22+
23+
class IngestorRuntimeKind(Enum):
24+
AZURE_CONTAINER_APP_JOB = "AZURE_CONTAINER_APP_JOB"
25+
DOCKER = "DOCKER"
26+
ECS = "ECS"
27+
KUBERNETES = "KUBERNETES"
28+
TEMPORAL_KUBERNETES = "TEMPORAL_KUBERNETES"
29+
30+
31+
class IngestorRunState(Enum):
32+
INITIALIZED = "INITIALIZED"
33+
QUEUED = "QUEUED"
34+
STARTED = "STARTED"
35+
RUNNING = "RUNNING"
36+
SUCCESS = "SUCCESS"
37+
FAILURE = "FAILURE"
38+
FAILED_TO_START = "FAILED_TO_START"
39+
40+
41+
@dataclass
42+
class IngestorRun:
43+
id: int
44+
container_runtime_type: Optional[IngestorRuntimeKind]
45+
payload: str
46+
token: str
47+
status: IngestorRunState
48+
started_at: datetime
49+
status_last_updated_at: Optional[datetime]
50+
completedAt: Optional[datetime]
51+
52+
53+
@dataclass
54+
class IngestorRunsFilterInput:
55+
id: Optional[int] = None
56+
status: Optional[List[IngestorRunState]] = None
57+
completed: Optional[bool] = None
58+
container_runtime_type: Optional[List[IngestorRuntimeKind]] = None
59+
60+
61+
class Order(Enum):
62+
ASC = "ASC"
63+
DESC = "DESC"
64+
65+
66+
@dataclass
67+
class IngestorRunsSortCriteriaInput:
68+
status: Optional[Order] = None
69+
started_at: Optional[Order] = None
70+
status_last_updated_at: Optional[Order] = None
71+
completed_at: Optional[Order] = None
72+
container_runtime_type: Optional[Order] = None

0 commit comments

Comments
 (0)