Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
276473a
add models
wesjdj Jan 30, 2026
cc9f460
add db.py
wesjdj Feb 2, 2026
910b7d8
add rest of component
wesjdj Feb 10, 2026
283b851
fix: correct orm and repository
wesjdj Feb 10, 2026
7229e64
fix: wire capacity reservation component into migrations and blueprint
wesjdj Feb 10, 2026
03d7afd
add migration and style checks
wesjdj Feb 10, 2026
2528e86
make end_date required and generate occurrences on create
wesjdj Feb 10, 2026
0192af3
wire up occurrenceadapter
wesjdj Feb 11, 2026
4d4d063
add active_at_timestamp to occurrences
wesjdj Feb 12, 2026
b93b773
add tasks
wesjdj Feb 13, 2026
1773a32
add task defs, wire in
wesjdj Feb 16, 2026
54553c6
fix: get k8s namespace from cluster connection
wesjdj Feb 16, 2026
e136c09
delete orphaned placeholder deployments
wesjdj Feb 17, 2026
f7df1f5
remove unused API endpoints
wesjdj Feb 17, 2026
c818cad
fix: name is on object
wesjdj Feb 18, 2026
73127cf
add K8s client wrapper for capacity reservations
wesjdj Feb 18, 2026
85b0780
use resource_class_id as focus for capacity reservations
wesjdj Feb 18, 2026
606a067
fix: regenerate schemas
wesjdj Feb 18, 2026
351de96
fix: schemathesis test fixes
wesjdj Feb 19, 2026
aa08b98
fix: style checks
wesjdj Feb 19, 2026
0c2b9bd
refactor: standardise naming across components
wesjdj Feb 20, 2026
fbe5d9c
generate appropriate placeholder deployment name
wesjdj Feb 23, 2026
ed66364
support node affinities
wesjdj Feb 23, 2026
d43f260
read default tolerations from env var
wesjdj Feb 23, 2026
095bf5d
read default node selectors and affinities from env var
wesjdj Feb 23, 2026
4b2590b
chore: add resource class id to cr deployment name
wesjdj Feb 24, 2026
2b4a0dd
use helm chart provided priorityclass, if it exists
wesjdj Feb 24, 2026
a6a3f15
chore: add jsonb versioning
wesjdj Feb 26, 2026
4ca32cc
addressing some review comments
wesjdj Feb 27, 2026
637b493
addressing remaining PR feedback
wesjdj Mar 2, 2026
3bfec47
fix: style checks
wesjdj Mar 2, 2026
dd7f84b
filter out hibernated AM sessions
wesjdj Mar 2, 2026
f024d5f
Merge branch 'main' into add-capacity-reservation-system
wesjdj Mar 2, 2026
89fa8b7
fix: stale import
wesjdj Mar 2, 2026
943be39
fix target migration head
wesjdj Mar 3, 2026
494d5ab
Merge branch 'main' into add-capacity-reservation-system
olevski Mar 3, 2026
e51b695
squashme: minor fix
olevski Mar 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ API_SPECS := \
components/renku_data_services/platform/apispec.py \
components/renku_data_services/data_connectors/apispec.py \
components/renku_data_services/search/apispec.py \
components/renku_data_services/notifications/apispec.py
components/renku_data_services/notifications/apispec.py \
components/renku_data_services/capacity_reservation/apispec.py

schemas: ${API_SPECS} ## Generate pydantic classes from apispec yaml files
@echo "generated classes based on ApiSpec"
Expand Down
9 changes: 9 additions & 0 deletions bases/renku_data_services/data_api/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from renku_data_services.base_api.error_handler import CustomErrorHandler
from renku_data_services.base_api.misc import MiscBP
from renku_data_services.base_models.core import Slug
from renku_data_services.capacity_reservation.blueprints import CapacityReservationBP
from renku_data_services.connected_services.blueprints import OAuth2ClientsBP, OAuth2ConnectionsBP
from renku_data_services.crc import apispec
from renku_data_services.crc.blueprints import (
Expand Down Expand Up @@ -265,6 +266,13 @@ def register_all_handlers(app: Sanic, dm: DependencyManager) -> Sanic:
authenticator=dm.authenticator,
alertmanager_webhook_role=dm.config.alertmanager_webhook_role,
)
capacity_reservation = CapacityReservationBP(
name="capacity_reservation",
url_prefix=url_prefix,
capacity_reservation_repo=dm.capacity_reservation_repo,
occurrence_repo=dm.occurrence_repo,
authenticator=dm.authenticator,
)
app.blueprint(
[
resource_pools.blueprint(),
Expand Down Expand Up @@ -293,6 +301,7 @@ def register_all_handlers(app: Sanic, dm: DependencyManager) -> Sanic:
data_connectors.blueprint(),
platform_redirects.blueprint(),
notifications.blueprint(),
capacity_reservation.blueprint(),
]
)
if builds is not None:
Expand Down
14 changes: 14 additions & 0 deletions bases/renku_data_services/data_api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from yaml import safe_load

import renku_data_services.base_models as base_models
import renku_data_services.capacity_reservation
import renku_data_services.connected_services
import renku_data_services.crc
import renku_data_services.data_connectors
Expand All @@ -24,6 +25,7 @@
from renku_data_services.authn.gitlab import EmptyGitlabAuthenticator, GitlabAuthenticator
from renku_data_services.authn.keycloak import KcUserStore, KeycloakAuthenticator
from renku_data_services.authz.authz import Authz
from renku_data_services.capacity_reservation.db import CapacityReservationRepository, OccurrenceRepository
from renku_data_services.connected_services.db import ConnectedServicesRepository
from renku_data_services.connected_services.oauth_http import DefaultOAuthHttpClientFactory, OAuthHttpClientFactory
from renku_data_services.crc import models as crc_models
Expand Down Expand Up @@ -154,6 +156,8 @@ class DependencyManager:
git_provider_helper: GitProviderHelperProto
notifications_repo: NotificationsRepository
oauth_http_client_factory: OAuthHttpClientFactory
capacity_reservation_repo: CapacityReservationRepository
occurrence_repo: OccurrenceRepository

spec: dict[str, Any] = field(init=False, repr=False, default_factory=dict)
app_name: str = "renku_data_services"
Expand Down Expand Up @@ -182,6 +186,7 @@ def load_apispec() -> dict[str, Any]:
renku_data_services.data_connectors.__file__,
renku_data_services.search.__file__,
renku_data_services.notifications.__file__,
renku_data_services.capacity_reservation.__file__,
]

api_specs = []
Expand Down Expand Up @@ -405,6 +410,13 @@ def from_env(cls) -> DependencyManager:
session_maker=config.db.async_session_maker,
alertmanager_webhook_role=config.alertmanager_webhook_role,
)
capacity_reservation_repo = CapacityReservationRepository(
session_maker=config.db.async_session_maker,
cluster_repo=cluster_repo,
)
occurrence_repo = OccurrenceRepository(
session_maker=config.db.async_session_maker,
)
return cls(
config,
authenticator=authenticator,
Expand Down Expand Up @@ -445,4 +457,6 @@ def from_env(cls) -> DependencyManager:
git_provider_helper=git_provider_helper,
notifications_repo=notifications_repo,
oauth_http_client_factory=oauth_http_client_factory,
capacity_reservation_repo=capacity_reservation_repo,
occurrence_repo=occurrence_repo,
)
7 changes: 7 additions & 0 deletions bases/renku_data_services/data_tasks/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ class Config:
main_log_interval_seconds: int
tcp_host: str
tcp_port: int
x_short_task_period_s: int
short_task_period_s: int
long_task_period_s: int
k8s_config_root: str

@classmethod
def from_env(cls) -> Config:
Expand All @@ -63,9 +65,12 @@ def from_env(cls) -> Config:
tcp_host = os.environ.get("TCP_HOST", "127.0.0.1")
tcp_port = int(os.environ.get("TCP_PORT", "8001"))

x_short_task_period = int(os.environ.get("X_SHORT_TASK_PERIOD_S", 30))
short_task_period = int(os.environ.get("SHORT_TASK_PERIOD_S", 2 * 60))
long_task_period = int(os.environ.get("LONG_TASK_PERIOD_S", 3 * 60 * 60))

k8s_config_root = os.environ.get("K8S_CONFIG_ROOT", "/secrets/kube_configs")

authz = AuthzConfig.from_env()

keycloak = None if dummy_stores else KeycloakConfig.from_env()
Expand All @@ -79,7 +84,9 @@ def from_env(cls) -> Config:
keycloak=keycloak,
tcp_host=tcp_host,
tcp_port=tcp_port,
x_short_task_period_s=x_short_task_period,
short_task_period_s=short_task_period,
long_task_period_s=long_task_period,
k8s_config_root=k8s_config_root,
dummy_stores=dummy_stores,
)
30 changes: 30 additions & 0 deletions bases/renku_data_services/data_tasks/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,19 @@
from dataclasses import dataclass

from renku_data_services.authz.authz import Authz
from renku_data_services.capacity_reservation.db import CapacityReservationRepository, OccurrenceRepository
from renku_data_services.capacity_reservation.k8s_client import CapacityReservationK8sClient
from renku_data_services.capacity_reservation.tasks import CapacityReservationTasks
from renku_data_services.crc.db import ClusterRepository
from renku_data_services.data_tasks.config import Config
from renku_data_services.k8s.clients import K8sClusterClientsPool
from renku_data_services.k8s.config import KubeConfigEnv
from renku_data_services.k8s.db import K8sDbCache
from renku_data_services.metrics.core import StagingMetricsService
from renku_data_services.metrics.db import MetricsRepository
from renku_data_services.namespace.db import GroupRepository
from renku_data_services.notebooks.config import get_clusters
from renku_data_services.notebooks.constants import AMALTHEA_SESSION_GVK
from renku_data_services.project.db import ProjectRepository
from renku_data_services.search.db import SearchUpdatesRepo
from renku_data_services.session.db import SessionRepository
Expand All @@ -30,6 +39,7 @@ class DependencyManager:
syncer: UsersSync
kc_api: IKeycloakAPI
session_tasks: SessionTasks
capacity_reservation_tasks: CapacityReservationTasks

@classmethod
def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
Expand Down Expand Up @@ -71,6 +81,25 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
authz=authz,
)
session_tasks = SessionTasks(session_environment_repo=session_environment_repo)
cluster_repo = ClusterRepository(session_maker=cfg.db.async_session_maker)
k8s_db_cache = K8sDbCache(cfg.db.async_session_maker)
k8s_client = K8sClusterClientsPool(
lambda: get_clusters(
kube_conf_root_dir=cfg.k8s_config_root,
default_kubeconfig=KubeConfigEnv(),
cluster_repo=cluster_repo,
cache=k8s_db_cache,
kinds_to_cache=[AMALTHEA_SESSION_GVK],
)
Comment thread
wesjdj marked this conversation as resolved.
)
cr_k8s_client = CapacityReservationK8sClient(client=k8s_client, cluster_repo=cluster_repo)
capacity_reservation_tasks = CapacityReservationTasks(
occurrence_repo=OccurrenceRepository(cfg.db.async_session_maker),
capacity_reservation_repo=CapacityReservationRepository(
cfg.db.async_session_maker, cluster_repo=cluster_repo
),
k8s_client=cr_k8s_client,
)
kc_api: IKeycloakAPI
if cfg.dummy_stores:
dummy_users = [
Expand All @@ -97,4 +126,5 @@ def from_env(cls, cfg: Config | None = None) -> "DependencyManager":
syncer=syncer,
kc_api=kc_api,
session_tasks=session_tasks,
capacity_reservation_tasks=capacity_reservation_tasks,
)
36 changes: 36 additions & 0 deletions bases/renku_data_services/data_tasks/task_defs.py
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,39 @@ async def initialize_session_environments(dm: DependencyManager) -> None:
await dm.session_tasks.initialize_session_environments_task(requested_by=api_user)


async def activate_capacity_reservations(dm: DependencyManager) -> None:
"""Activate pending capacity reservation occurrences."""
while True:
try:
await dm.capacity_reservation_tasks.activate_pending_occurrences_task()
except (asyncio.CancelledError, KeyboardInterrupt) as e:
logger.warning(f"Exiting: {e}")
else:
await asyncio.sleep(dm.config.x_short_task_period_s)


async def monitor_capacity_reservations(dm: DependencyManager) -> None:
"""Monitor active capacity reservation occurrences."""
while True:
try:
await dm.capacity_reservation_tasks.monitor_active_occurrences_task()
except (asyncio.CancelledError, KeyboardInterrupt) as e:
logger.warning(f"Exiting: {e}")
else:
await asyncio.sleep(dm.config.x_short_task_period_s)


async def cleanup_orphaned_capacity_reservations(dm: DependencyManager) -> None:
"""Clean up capacity reservation deployments whose occurrences no longer exist."""
while True:
try:
await dm.capacity_reservation_tasks.cleanup_orphaned_deployments_task()
except (asyncio.CancelledError, KeyboardInterrupt) as e:
logger.warning(f"Exiting: {e}")
else:
await asyncio.sleep(dm.config.x_short_task_period_s)


def all_tasks(dm: DependencyManager) -> TaskDefininions:
"""A dict of task factories to be managed in main."""
# Impl. note: We pass the entire config to the coroutines, because
Expand All @@ -421,5 +454,8 @@ def all_tasks(dm: DependencyManager) -> TaskDefininions:
"users_sync": lambda: users_sync(dm),
"sync_admins_from_keycloak": lambda: sync_admins_from_keycloak(dm),
"initialize_session_environments": lambda: initialize_session_environments(dm),
"activate_capacity_reservations": lambda: activate_capacity_reservations(dm),
"monitor_capacity_reservations": lambda: monitor_capacity_reservations(dm),
"cleanup_orphaned_capacity_reservations": lambda: cleanup_orphaned_capacity_reservations(dm),
}
)
1 change: 1 addition & 0 deletions components/renku_data_services/base_models/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class ServiceAdminId(StrEnum):
secrets_rotation = "secrets_rotation"
k8s_watcher = "k8s_watcher"
search_reprovision = "search_reprovision"
capacity_reservation = "capacity_reservation"


@dataclass(kw_only=True, frozen=True)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Blueprints for capacity reservations."""
Loading
Loading