Skip to content

Commit a1fc18c

Browse files
committed
seems to work
Signed-off-by: vince <vince.white@zepben.com>
1 parent 03fa5c5 commit a1fc18c

File tree

4 files changed

+401
-2
lines changed

4 files changed

+401
-2
lines changed

changelog.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44
* None.
55

66
### New Features
7-
* None.
7+
* Added basic client method `run_ingestor` to run ingestors via EAS's `executeIngestor` graphql mutation.
8+
* Added basic client methods `get_ingestor_run` and `get_ingestor_run_list` to retrieve the records of previous ingestor runs.
89

910
### Enhancements
1011
* None.

src/zepben/eas/client/eas_client.py

Lines changed: 173 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from http import HTTPStatus
1212
from json import dumps
1313
from typing import Optional, List
14+
from dataclasses import asdict
1415

1516
import aiohttp
1617
from aiohttp import ClientSession
@@ -20,6 +21,7 @@
2021
from zepben.eas.client.feeder_load_analysis_input import FeederLoadAnalysisInput
2122
from zepben.eas.client.opendss import OpenDssConfig, GetOpenDssModelsFilterInput, GetOpenDssModelsSortCriteriaInput
2223
from zepben.eas.client.study import Study
24+
from zepben.eas.client.ingestor import IngestorConfigInput, IngestorRunsFilterInput, IngestorRunsSortCriteriaInput
2325
from zepben.eas.client.util import construct_url
2426
from zepben.eas.client.work_package import WorkPackageConfig, FixedTime, TimePeriod, ForecastConfig, FeederConfigs
2527

@@ -861,6 +863,50 @@ async def async_upload_study(self, study: Study):
861863
}
862864
}
863865
}
866+
if self._verify_certificate:
867+
sslcontext = ssl.create_default_context(cafile=self._ca_filename)
868+
869+
async with self.session.post(
870+
construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"),
871+
headers=self._get_request_headers(),
872+
json=json,
873+
ssl=sslcontext if self._verify_certificate else False
874+
) as response:
875+
if response.ok:
876+
response = await response.json()
877+
else:
878+
response = await response.text()
879+
return response
880+
881+
def run_ingestor(self, run_config: List[IngestorConfigInput]):
882+
"""
883+
Send request to perform an ingestor run
884+
:param run_config: A list of IngestorConfigInput
885+
:return: The HTTP response received from the Evolve App Server after attempting to run the ingestor
886+
"""
887+
return get_event_loop().run_until_complete(
888+
self.async_run_ingestor(run_config))
889+
890+
async def async_run_ingestor(self, run_config: List[IngestorConfigInput]):
891+
"""
892+
Send asynchronous request to perform an ingestor run
893+
:param run_config: A list of IngestorConfigInput
894+
:return: The HTTP response received from the Evolve App Server after attempting to run the ingestor
895+
"""
896+
with warnings.catch_warnings():
897+
if not self._verify_certificate:
898+
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
899+
json = {
900+
"query": """
901+
mutation executeIngestor($runConfig: [IngestorConfigInput!]) {
902+
executeIngestor(runConfig: $runConfig)
903+
}
904+
""",
905+
"variables": {
906+
"runConfig": [asdict(x) for x in run_config],
907+
}
908+
}
909+
864910
if self._verify_certificate:
865911
sslcontext = ssl.create_default_context(cafile=self._ca_filename)
866912

@@ -875,7 +921,133 @@ async def async_upload_study(self, study: Study):
875921
else:
876922
response.raise_for_status()
877923

878-
def run_hosting_capacity_calibration(self, calibration_name: str, local_calibration_time: datetime, feeders: Optional[List[str]] = None):
924+
def get_ingestor_run(self, ingestor_run_id: int):
925+
"""
926+
Send request to retrieve the record of a particular ingestor run.
927+
:param ingestor_run_id: The ID of the ingestor run to retrieve execution information about.
928+
:return: The HTTP response received from the Evolve App Server including the ingestor run information (if found).
929+
"""
930+
return get_event_loop().run_until_complete(
931+
self.async_get_ingestor_run(ingestor_run_id))
932+
933+
async def async_get_ingestor_run(self, ingestor_run_id: int):
934+
"""
935+
Send asynchronous request to retrieve the record of a particular ingestor run.
936+
:param ingestor_run_id: The ID of the ingestor run to retrieve execution information about.
937+
:return: The HTTP response received from the Evolve App Server including the ingestor run information (if found).
938+
"""
939+
with warnings.catch_warnings():
940+
if not self._verify_certificate:
941+
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
942+
json = {
943+
"query": """
944+
query getIngestorRun($id: Int!) {
945+
getIngestorRun(id: $id) {
946+
id
947+
containerRuntimeType,
948+
payload,
949+
token,
950+
status,
951+
startedAt,
952+
statusLastUpdatedAt,
953+
completedAt
954+
}
955+
}
956+
""",
957+
"variables": {
958+
"id": ingestor_run_id,
959+
}
960+
}
961+
962+
if self._verify_certificate:
963+
sslcontext = ssl.create_default_context(cafile=self._ca_filename)
964+
965+
async with self.session.post(
966+
construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"),
967+
headers=self._get_request_headers(),
968+
json=json,
969+
ssl=sslcontext if self._verify_certificate else False
970+
) as response:
971+
if response.ok:
972+
response = await response.json()
973+
else:
974+
response = await response.text()
975+
return response
976+
977+
def get_ingestor_run_list(self, query_filter: Optional[IngestorRunsFilterInput] = None,
978+
query_sort: Optional[IngestorRunsSortCriteriaInput] = None):
979+
"""
980+
Send request to retrieve a list of ingestor run records matching the provided filter parameters.
981+
:param query_filter: An `IngestorRunsFilterInput` object. Only records matching the provided values will be returned.
982+
If not supplied all records will be returned. (Optional)
983+
:param query_sort: An `IngestorRunsSortCriteriaInput` that can control the order of the returned record based on a number of fields. (Optional)
984+
:return: The HTTP response received from the Evolve App Server including all matching ingestor records found.
985+
"""
986+
return get_event_loop().run_until_complete(
987+
self.async_get_ingestor_run_list(query_filter, query_sort))
988+
989+
async def async_get_ingestor_run_list(self, query_filter: Optional[IngestorRunsFilterInput] = None,
990+
query_sort: Optional[IngestorRunsSortCriteriaInput] = None):
991+
"""
992+
Send asynchronous request to retrieve a list of ingestor run records matching the provided filter parameters.
993+
:param query_filter: An `IngestorRunsFilterInput` object. Only records matching the provided values will be returned.
994+
If not supplied all records will be returned. (Optional)
995+
:param query_sort: An `IngestorRunsSortCriteriaInput` that can control the order of the returned record based on a number of fields. (Optional)
996+
:return: The HTTP response received from the Evolve App Server including all matching ingestor records found.
997+
"""
998+
999+
with warnings.catch_warnings():
1000+
if not self._verify_certificate:
1001+
warnings.filterwarnings("ignore", category=InsecureRequestWarning)
1002+
json = {
1003+
"query": """
1004+
query listIngestorRuns($filter: IngestorRunsFilterInput, $sort: IngestorRunsSortCriteriaInput) {
1005+
listIngestorRuns(filter: $filter, sort: $sort) {
1006+
id
1007+
containerRuntimeType,
1008+
payload,
1009+
token,
1010+
status,
1011+
startedAt,
1012+
statusLastUpdatedAt,
1013+
completedAt
1014+
}
1015+
}
1016+
""",
1017+
"variables": {
1018+
**({"filter": {
1019+
"id": query_filter.id,
1020+
"status": query_filter.status and [state.name for state in query_filter.status],
1021+
"completed": query_filter.completed,
1022+
"containerRuntimeType": query_filter.container_runtime_type and [runtime.name for runtime in
1023+
query_filter.container_runtime_type]
1024+
}} if query_filter else {}),
1025+
**({"sort": {
1026+
"status": query_sort.status and query_sort.status.name,
1027+
"startedAt": query_sort.started_at and query_sort.started_at.name,
1028+
"statusLastUpdatedAt": query_sort.status_last_updated_at and query_sort.status_last_updated_at.name,
1029+
"completedAt": query_sort.completed_at and query_sort.completed_at.name,
1030+
"containerRuntimeType": query_sort.container_runtime_type and query_sort.container_runtime_type.name,
1031+
}} if query_sort else {})
1032+
}
1033+
}
1034+
1035+
if self._verify_certificate:
1036+
sslcontext = ssl.create_default_context(cafile=self._ca_filename)
1037+
1038+
async with self.session.post(
1039+
construct_url(protocol=self._protocol, host=self._host, port=self._port, path="/api/graphql"),
1040+
headers=self._get_request_headers(),
1041+
json=json,
1042+
ssl=sslcontext if self._verify_certificate else False
1043+
) as response:
1044+
if response.ok:
1045+
response = await response.json()
1046+
else:
1047+
response = await response.text()
1048+
return response
1049+
1050+
def run_hosting_capacity_calibration(self, calibration_name: str, local_calibration_time: Optional[str] = None, feeders: Optional[List[str]] = None):
8791051
"""
8801052
Send request to run hosting capacity calibration
8811053
:param calibration_name: A string representation of the calibration name

src/zepben/eas/client/ingestor.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
# Copyright 2025 Zeppelin Bend Pty Ltd
2+
#
3+
# This Source Code Form is subject to the terms of the Mozilla Public
4+
# License, v. 2.0. If a copy of the MPL was not distributed with this
5+
# file, You can obtain one at https://mozilla.org/MPL/2.0/.
6+
from dataclasses import dataclass
7+
from enum import Enum
8+
from typing import Optional, List
9+
from datetime import datetime
10+
11+
__all__ = [
12+
"IngestorConfigInput", "IngestorRuntimeKind", "IngestorRunState", "IngestorRun", "IngestorRunsFilterInput", "Order",
13+
"IngestorRunsSortCriteriaInput"
14+
]
15+
16+
17+
@dataclass
18+
class IngestorConfigInput:
19+
key: str
20+
value: str
21+
22+
23+
class IngestorRuntimeKind(Enum):
24+
AZURE_CONTAINER_APP_JOB = "AZURE_CONTAINER_APP_JOB"
25+
DOCKER = "DOCKER"
26+
ECS = "ECS"
27+
KUBERNETES = "KUBERNETES"
28+
TEMPORAL_KUBERNETES = "TEMPORAL_KUBERNETES"
29+
30+
31+
class IngestorRunState(Enum):
32+
INITIALIZED = "INITIALIZED"
33+
QUEUED = "QUEUED"
34+
STARTED = "STARTED"
35+
RUNNING = "RUNNING"
36+
SUCCESS = "SUCCESS"
37+
FAILURE = "FAILURE"
38+
FAILED_TO_START = "FAILED_TO_START"
39+
40+
41+
@dataclass
42+
class IngestorRun:
43+
id: int
44+
container_runtime_type: Optional[IngestorRuntimeKind]
45+
payload: str
46+
token: str
47+
status: IngestorRunState
48+
started_at: datetime
49+
status_last_updated_at: Optional[datetime]
50+
completedAt: Optional[datetime]
51+
52+
53+
@dataclass
54+
class IngestorRunsFilterInput:
55+
id: Optional[int] = None
56+
status: Optional[List[IngestorRunState]] = None
57+
completed: Optional[bool] = None
58+
container_runtime_type: Optional[List[IngestorRuntimeKind]] = None
59+
60+
61+
class Order(Enum):
62+
ASC = "ASC"
63+
DESC = "DESC"
64+
65+
66+
@dataclass
67+
class IngestorRunsSortCriteriaInput:
68+
status: Optional[Order] = None
69+
started_at: Optional[Order] = None
70+
status_last_updated_at: Optional[Order] = None
71+
completed_at: Optional[Order] = None
72+
container_runtime_type: Optional[Order] = None

0 commit comments

Comments
 (0)