Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ WORKDIR /app

COPY ./backend/pyproject.toml ./backend/poetry.lock /app/
ENV PIP_NO_CACHE_DIR=1
RUN pip install poetry && \
RUN pip install poetry==1.8.5 && \
python -m venv --copies /app/venv && \
. /app/venv/bin/activate && \
poetry install --without=dev --no-interaction
Expand All @@ -37,7 +37,7 @@ WORKDIR /app

COPY --from=backend /app /app
COPY --from=frontend /build/out /app/static
ENV PATH /app/venv/bin:$PATH
ENV PATH=/app/venv/bin:$PATH
COPY ./backend /app

# install streams_explorer package
Expand Down
195 changes: 136 additions & 59 deletions backend/poetry.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ uvicorn = { extras = ["standard"], version = "^0.20.0" }
pygraphviz = "^1.10"
confluent-kafka = "2.4.0"
cachetools = "^4.2.2"
kubernetes-asyncio = "^23.6.0"
kubernetes-asyncio = "^32.0.0"

[tool.poetry.group.dev.dependencies]
mypy = "^0.981"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import TYPE_CHECKING

from kubernetes_asyncio.client import V1beta1CronJob, V1Job
from kubernetes_asyncio.client import V1CronJob, V1Job

import streams_explorer.core.k8s_app as k8s
from streams_explorer.core.extractor.extractor import ProducerAppExtractor
Expand All @@ -24,7 +24,7 @@ def on_job_parsing(self, job: V1Job) -> K8sAppJob | None:
if producer.is_streams_app():
return producer

def on_cron_job_parsing(self, cron_job: V1beta1CronJob) -> K8sAppCronJob | None:
def on_cron_job_parsing(self, cron_job: V1CronJob) -> K8sAppCronJob | None:
producer = k8s.K8sAppCronJob(cron_job)
if producer.is_streams_app():
return producer
4 changes: 2 additions & 2 deletions backend/streams_explorer/core/extractor/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dataclasses import dataclass, field
from typing import TYPE_CHECKING

from kubernetes_asyncio.client import V1beta1CronJob, V1Job
from kubernetes_asyncio.client import V1CronJob, V1Job

from streams_explorer.models.k8s import K8sConfig
from streams_explorer.models.kafka_connector import KafkaConnector
Expand Down Expand Up @@ -50,5 +50,5 @@ def on_job_parsing(self, job: V1Job) -> K8sAppJob | None:
...

@abstractmethod
def on_cron_job_parsing(self, cron_job: V1beta1CronJob) -> K8sAppCronJob | None:
def on_cron_job_parsing(self, cron_job: V1CronJob) -> K8sAppCronJob | None:
...
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from typing import TYPE_CHECKING, NamedTuple

from kubernetes_asyncio.client import V1beta1CronJob, V1Job
from kubernetes_asyncio.client import V1CronJob, V1Job
from loguru import logger

from streams_explorer.core.extractor.default.generic import GenericSink, GenericSource
Expand Down Expand Up @@ -73,7 +73,7 @@ def on_job(self, job: V1Job) -> K8sAppJob | None:
if app := extractor.on_job_parsing(job):
return app

def on_cron_job(self, cron_job: V1beta1CronJob) -> K8sAppCronJob | None:
def on_cron_job(self, cron_job: V1CronJob) -> K8sAppCronJob | None:
for extractor in self.extractors:
if isinstance(extractor, ProducerAppExtractor):
if app := extractor.on_cron_job_parsing(cron_job):
Expand Down
8 changes: 4 additions & 4 deletions backend/streams_explorer/core/k8s_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from typing import TypeAlias

from kubernetes_asyncio.client import (
V1beta1CronJob,
V1Container,
V1CronJob,
V1Deployment,
V1Job,
V1ObjectMeta,
Expand All @@ -21,7 +21,7 @@

ATTR_PIPELINE = "pipeline"

K8sObject: TypeAlias = V1Deployment | V1StatefulSet | V1Job | V1beta1CronJob
K8sObject: TypeAlias = V1Deployment | V1StatefulSet | V1Job | V1CronJob

config_parser: type[K8sConfigParser] = load_config_parser()

Expand Down Expand Up @@ -165,7 +165,7 @@ def factory(k8s_object: K8sObject) -> K8sApp:
return K8sAppStatefulSet(k8s_object)
case V1Job(): # type: ignore[misc]
return K8sAppJob(k8s_object)
case V1beta1CronJob(): # type: ignore[misc]
case V1CronJob(): # type: ignore[misc]
return K8sAppCronJob(k8s_object)
case _:
raise ValueError(k8s_object)
Expand Down Expand Up @@ -216,7 +216,7 @@ def __set_attributes(self) -> None:


class K8sAppCronJob(K8sApp):
def __init__(self, k8s_object: V1beta1CronJob) -> None:
def __init__(self, k8s_object: V1CronJob) -> None:
super().__init__(k8s_object)

def setup(self) -> None:
Expand Down
28 changes: 16 additions & 12 deletions backend/streams_explorer/core/services/dataflow_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def _add_streaming_app(self, graph: nx.DiGraph, app: K8sApp) -> None:
graph.add_node(
app.id,
label=app.name,
node_type=NodeTypesEnum.STREAMING_APP,
node_type=NodeTypesEnum.STREAMING_APP.value,
**app.attributes,
)

Expand Down Expand Up @@ -106,7 +106,7 @@ def add_connector(
graph.add_node(
connector.name,
label=connector.name,
node_type=NodeTypesEnum.CONNECTOR,
node_type=NodeTypesEnum.CONNECTOR.value,
)
for topic in connector.get_topics():
self._add_topic(graph, topic)
Expand All @@ -131,8 +131,8 @@ def add_source(self, source: Source) -> None:
node: GraphNode = (
source.name,
{
NodeDataFields.LABEL: source.name,
NodeDataFields.NODE_TYPE: source.node_type,
NodeDataFields.LABEL.value: source.name,
NodeDataFields.NODE_TYPE.value: source.node_type,
},
)
edge: GraphEdge = (source.name, source.target)
Expand All @@ -141,7 +141,10 @@ def add_source(self, source: Source) -> None:
def add_sink(self, sink: Sink) -> None:
node: GraphNode = (
sink.name,
{NodeDataFields.LABEL: sink.name, NodeDataFields.NODE_TYPE: sink.node_type},
{
NodeDataFields.LABEL.value: sink.name,
NodeDataFields.NODE_TYPE.value: sink.node_type,
},
)
edge: GraphEdge = (sink.source, sink.name)
self.add_to_graph(node, edge, reverse=True)
Expand All @@ -150,7 +153,8 @@ def add_to_graph(
self, node: GraphNode, edge: GraphEdge, reverse: bool = False
) -> None:
node_name, node_data = node
self.graph.update(nodes=[node], edges=[edge])
self.graph.add_node(node_name, **node_data)
self.graph.add_edge(*edge)
Comment thread
disrupted marked this conversation as resolved.

if pipelines := self.find_associated_pipelines(node_name, reverse=reverse):
target = (set(edge) - {node_name}).pop()
Expand Down Expand Up @@ -184,7 +188,7 @@ async def get_metrics(self) -> list[Metric]:

def get_node_type(self, id: str) -> NodeTypesEnum:
try:
return self.graph.nodes[id][NodeDataFields.NODE_TYPE]
return self.graph.nodes[id][NodeDataFields.NODE_TYPE.value]
except KeyError:
raise NodeNotFound()

Expand All @@ -209,17 +213,17 @@ def find_associated_pipelines(

@staticmethod
def _add_topic(graph: nx.DiGraph, name: str) -> None:
graph.add_node(name, label=name, node_type=NodeTypesEnum.TOPIC)
graph.add_node(name, label=name, node_type=NodeTypesEnum.TOPIC.value)

@staticmethod
def _filter_topic_node_ids(graph: nx.DiGraph) -> set[str]:
return {
node_id
for node_id, data in graph.nodes(data=True)
if data[ # pyright: ignore[reportOptionalSubscript]
NodeDataFields.NODE_TYPE
NodeDataFields.NODE_TYPE.value
]
in (NodeTypesEnum.TOPIC, NodeTypesEnum.ERROR_TOPIC)
in (NodeTypesEnum.TOPIC.value, NodeTypesEnum.ERROR_TOPIC.value)
}

@staticmethod
Expand Down Expand Up @@ -326,8 +330,8 @@ def _add_error_topic(
graph.add_node(
topic_name,
**{
NodeDataFields.LABEL: topic_name,
NodeDataFields.NODE_TYPE: NodeTypesEnum.ERROR_TOPIC,
NodeDataFields.LABEL.value: topic_name,
NodeDataFields.NODE_TYPE.value: NodeTypesEnum.ERROR_TOPIC.value,
},
)
graph.add_edge(app_id, topic_name)
Expand Down
15 changes: 7 additions & 8 deletions backend/streams_explorer/core/services/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
ApiException,
EventsV1Event,
EventsV1EventList,
V1beta1CronJob,
V1beta1CronJobList,
V1CronJob,
V1CronJobList,
V1Deployment,
V1DeploymentList,
V1Job,
Expand All @@ -39,7 +39,7 @@ class K8sResource(NamedTuple):
V1DeploymentList
| V1StatefulSetList
| V1JobList
| V1beta1CronJobList
| V1CronJobList
| EventsV1EventList,
]
return_type: type | None
Expand Down Expand Up @@ -95,7 +95,7 @@ async def setup(self) -> None:
logger.info("Setup K8s environment in cluster")
kubernetes_asyncio.config.load_incluster_config()
else:
logger.info("Setup K8s environment")
logger.info(f"Setup K8s environment for context {self.context}")
await kubernetes_asyncio.config.load_kube_config(context=self.context)
except kubernetes_asyncio.config.ConfigException as e:
raise Exception("Could not load K8s environment configuration") from e
Expand All @@ -106,7 +106,6 @@ async def setup(self) -> None:

self.k8s_app_client = kubernetes_asyncio.client.AppsV1Api()
self.k8s_batch_client = kubernetes_asyncio.client.BatchV1Api()
self.k8s_beta_batch_client = kubernetes_asyncio.client.BatchV1beta1Api()
self.k8s_events_client = kubernetes_asyncio.client.EventsV1Api()

async def watch(self) -> None:
Expand All @@ -125,8 +124,8 @@ def list_jobs(namespace: str, *args, **kwargs) -> V1JobList:
*args, namespace=namespace, **kwargs
)

def list_cron_jobs(namespace: str, *args, **kwargs) -> V1beta1CronJobList:
return self.k8s_beta_batch_client.list_namespaced_cron_job(
def list_cron_jobs(namespace: str, *args, **kwargs) -> V1CronJobList:
return self.k8s_batch_client.list_namespaced_cron_job(
*args, namespace=namespace, **kwargs
)

Expand All @@ -153,7 +152,7 @@ def list_events(namespace: str, *args, **kwargs) -> EventsV1EventList:
),
K8sResource(
list_cron_jobs,
V1beta1CronJob,
V1CronJob,
self.streams_explorer.handle_deployment_update,
),
K8sResource(
Expand Down
4 changes: 2 additions & 2 deletions backend/streams_explorer/streams_explorer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from cachetools.func import ttl_cache
from fastapi import WebSocket
from kubernetes_asyncio.client import V1beta1CronJob, V1Job
from kubernetes_asyncio.client import V1CronJob, V1Job
from loguru import logger

from streams_explorer.core.client_manager import ClientManager
Expand Down Expand Up @@ -157,7 +157,7 @@ async def handle_deployment_update(self, update: K8sDeploymentUpdate) -> None:
match item:
case V1Job(): # type: ignore[misc]
app = extractor_container.on_job(item)
case V1beta1CronJob(): # type: ignore[misc]
case V1CronJob(): # type: ignore[misc]
app = extractor_container.on_cron_job(item)
case _:
app = K8sApp.factory(item)
Expand Down
6 changes: 3 additions & 3 deletions backend/tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from fastapi.testclient import TestClient
from kubernetes_asyncio.client import (
EventsV1Event,
V1beta1CronJob,
V1CronJob,
V1Deployment,
V1ObjectMeta,
V1ObjectReference,
Expand Down Expand Up @@ -71,8 +71,8 @@ def stateful_sets(self) -> list[V1StatefulSet]:
return []

@pytest.fixture()
def cron_jobs(self) -> list[V1beta1CronJob]:
return [V1beta1CronJob(metadata=V1ObjectMeta(name="test"))]
def cron_jobs(self) -> list[V1CronJob]:
return [V1CronJob(metadata=V1ObjectMeta(name="test"))]

@pytest.mark.asyncio
async def test_update_every_x_seconds(
Expand Down
16 changes: 8 additions & 8 deletions backend/tests/test_extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@

import pytest
from kubernetes_asyncio.client import (
V1beta1CronJob,
V1beta1CronJobSpec,
V1beta1JobTemplateSpec,
V1Container,
V1CronJob,
V1CronJobSpec,
V1EnvVar,
V1Job,
V1JobSpec,
V1JobTemplateSpec,
V1ObjectMeta,
V1OwnerReference,
V1PodSpec,
Expand Down Expand Up @@ -78,7 +78,7 @@ def on_connector_info_parsing(
from pathlib import Path
from typing import TYPE_CHECKING

from kubernetes_asyncio.client import V1beta1CronJob, V1Job
from kubernetes_asyncio.client import V1CronJob, V1Job
from streams_explorer.models.k8s import K8sConfig
from streams_explorer.core.extractor.extractor import (
ProducerAppExtractor,
Expand All @@ -98,7 +98,7 @@ def on_streaming_app_delete(self, config: K8sConfig) -> None:
def on_job_parsing(self, job: V1Job) -> K8sAppJob | None:
pass

def on_cron_job_parsing(self, cron_job: V1beta1CronJob) -> K8sAppCronJob | None:
def on_cron_job_parsing(self, cron_job: V1CronJob) -> K8sAppCronJob | None:
pass
"""

Expand Down Expand Up @@ -445,8 +445,8 @@ def test_streams_bootstrap_producer(self):
pod_spec = V1PodSpec(containers=[container])
pod_template_spec = V1PodTemplateSpec(spec=pod_spec)
job_spec = V1JobSpec(template=pod_template_spec, selector=None)
job_template = V1beta1JobTemplateSpec(spec=job_spec)
spec = V1beta1CronJobSpec(job_template=job_template, schedule="* * * * *")
job_template = V1JobTemplateSpec(spec=job_spec)
spec = V1CronJobSpec(job_template=job_template, schedule="* * * * *")
name = "test-files-import"
metadata = V1ObjectMeta(
name=name,
Expand All @@ -461,7 +461,7 @@ def test_streams_bootstrap_producer(self):
},
namespace="test-namespace",
)
cron_job = V1beta1CronJob(metadata=metadata, spec=spec)
cron_job = V1CronJob(metadata=metadata, spec=spec)
app_cron_job = extractor.on_cron_job_parsing(cron_job)
assert isinstance(app_cron_job, K8sAppCronJob), "should extract CronJob"

Expand Down
7 changes: 4 additions & 3 deletions backend/tests/test_kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ async def test_watch(kubernetes: Kubernetes, mocker: MockFixture):
call("V1Deployment"),
call("V1StatefulSet"),
call("V1Job"),
call("V1beta1CronJob"),
call("V1CronJob"),
]
mock_Watch.__aenter__.assert_awaited()
assert mock_Watch.__aenter__.await_count == 4
Expand All @@ -46,7 +46,7 @@ async def test_watch(kubernetes: Kubernetes, mocker: MockFixture):
"V1Deployment",
"V1StatefulSet",
"V1Job",
"V1beta1CronJob",
"V1CronJob",
"EventsV1Event",
]

Expand Down Expand Up @@ -103,7 +103,8 @@ async def mock_callback() -> None:
assert e.value.reason == "Internal Server Error"

mock_kubernetes_asyncio_watch.return_value.__aenter__.side_effect = ApiException(
status=409, reason="Expired" # demo error, doesn't exist
status=409,
reason="Expired", # demo error, doesn't exist
)

with pytest.raises(ApiException) as e:
Expand Down
Loading