Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions src/powerapi/cli/generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,17 @@ def _k8s_pre_processor_factory(processor_config: dict) -> ProcessorActor:
:param processor_config: Pre-Processor configuration
:return: Configured Kubernetes pre-processor actor
"""
from powerapi.processor.pre.k8s.actor import K8sPreProcessorActor, K8sProcessorConfig
name = processor_config[ACTOR_NAME_KEY]
from powerapi.processor.pre.k8s.actor import K8sPreProcessorActor
from powerapi.processor.pre.k8s.monitor_agent import K8sMonitorConfig

api_mode = processor_config[K8S_API_MODE_KEY]
api_host = processor_config.get(K8S_API_HOST_KEY, None)
api_key = processor_config.get(K8S_API_KEY_KEY, None)
monitor_config = K8sMonitorConfig(api_mode, api_host, api_key)

name = processor_config[ACTOR_NAME_KEY]
level_logger = logging.DEBUG if processor_config[GENERAL_CONF_VERBOSE_KEY] else logging.INFO
config = K8sProcessorConfig(api_mode, api_host, api_key)
return K8sPreProcessorActor(name, config, level_logger)
return K8sPreProcessorActor(name, monitor_config, level_logger)

@staticmethod
def _openstack_pre_processor_factory(processor_config: dict) -> ProcessorActor:
Expand Down
28 changes: 7 additions & 21 deletions src/powerapi/processor/pre/k8s/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import logging
from dataclasses import dataclass
from multiprocessing import Manager

from powerapi.actor import Actor, State
Expand All @@ -38,60 +37,47 @@
from .handlers import K8sPreProcessorActorHWPCReportHandler
from .handlers import K8sPreProcessorActorStartMessageHandler, K8sPreProcessorActorPoisonPillMessageHandler
from .metadata_cache_manager import K8sMetadataCacheManager
from .monitor_agent import K8sMonitorAgent


@dataclass
class K8sProcessorConfig:
"""
Kubernetes processor actor configuration.
:param api_mode: Kubernetes API mode (manual, local, cluster)
:param api_host: Kubernetes API host to connect to
:param api_key: Kubernetes API key (Bearer Token) to authenticate with
"""
api_mode: str | None = None
api_host: str | None = None
api_key: str | None = None
from .monitor_agent import K8sMonitorAgent, K8sMonitorConfig


class K8sProcessorState(State):
"""
State of the Kubernetes processor actor.
"""

def __init__(self, actor: Actor, config: K8sProcessorConfig):
def __init__(self, actor: Actor, monitor_config: K8sMonitorConfig):
"""
Initializes a Kubernetes pre-processor state.
"""
super().__init__(actor)

self.manager = Manager()
self.metadata_cache_manager = K8sMetadataCacheManager(self.manager)
self.monitor_agent = K8sMonitorAgent(self.metadata_cache_manager, config.api_mode, config.api_host, config.api_key)
self.monitor_agent = K8sMonitorAgent(self.metadata_cache_manager, monitor_config)


class K8sPreProcessorActor(ProcessorActor):
"""
Pre-Processor Actor that adds Kubernetes related metadata to reports.
"""

def __init__(self, name: str, config: K8sProcessorConfig, level_logger: int = logging.WARNING, timeout: int = 5000):
def __init__(self, name: str, monitor_config: K8sMonitorConfig, level_logger: int = logging.WARNING, timeout: int = 5000):
"""
Initializes a Kubernetes pre-processor actor.
:param name: The name of the actor
:param config: Configuration of the actor
:param monitor_config: Configuration of the monitoring agent
:param level_logger: logging level of the actor
:param timeout: timeout in seconds
"""
super().__init__(name, level_logger, timeout)

self.config = config
self.monitor_config = monitor_config

def setup(self):
"""
Set up the Kubernetes pre-processor actor.
"""
self.state = K8sProcessorState(self, self.config)
self.state = K8sProcessorState(self, self.monitor_config)

self.add_handler(StartMessage, K8sPreProcessorActorStartMessageHandler(self.state))
self.add_handler(HWPCReport, K8sPreProcessorActorHWPCReportHandler(self.state))
Expand Down
7 changes: 4 additions & 3 deletions src/powerapi/processor/pre/k8s/metadata_cache_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from collections.abc import MutableMapping
from dataclasses import dataclass
from multiprocessing import Manager
from multiprocessing.managers import SyncManager

ADDED_EVENT = 'ADDED'
DELETED_EVENT = 'DELETED'
Expand All @@ -52,11 +53,11 @@ class K8sMetadataCacheManager:
Kubernetes container metadata cache manager.
"""

def __init__(self, manager: Manager):
def __init__(self, manager: SyncManager):
"""
:param manager: Manager of the shared metadata cache
"""
self.metadata_cache: dict[str, K8sContainerMetadata] = manager.dict()
self.metadata_cache: MutableMapping[str, K8sContainerMetadata] = manager.dict()

def update_container_metadata(self, event: str, container_metadata: K8sContainerMetadata):
"""
Expand Down
135 changes: 69 additions & 66 deletions src/powerapi/processor/pre/k8s/monitor_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,86 +29,90 @@

import logging
import sys
from multiprocessing import Process
from dataclasses import dataclass
from multiprocessing import Process, Event
from signal import signal, SIGTERM, SIGINT
from time import sleep

from kubernetes import client, config, watch
from kubernetes.client import V1Pod, V1PodList, V1ContainerStatus
from kubernetes.client.configuration import Configuration
from kubernetes.client.rest import ApiException
from urllib3.exceptions import ProtocolError

from .metadata_cache_manager import ADDED_EVENT, MODIFIED_EVENT, DELETED_EVENT
from .metadata_cache_manager import K8sMetadataCacheManager, K8sContainerMetadata
from .metadata_cache_manager import K8sMetadataCacheManager, K8sContainerMetadata, ADDED_EVENT, MODIFIED_EVENT, DELETED_EVENT

LOCAL_CONFIG_MODE = "local"
MANUAL_CONFIG_MODE = "manual"
CLUSTER_CONFIG_MODE = "cluster"
K8S_MONITOR_RETRY_DELAY_SECONDS = 1.0


def _setup_k8s_client_with_local_config() -> None:
@dataclass(frozen=True)
class K8sMonitorConfig:
"""
Setup Kubernetes API client with a kube-config file. (from KUBECONFIG environment variable, or ~/.kube/config)
Kubernetes monitoring agent configuration.
:param api_mode: Kubernetes API mode (manual, local, cluster)
:param api_host: Kubernetes API host to connect to
:param api_key: Kubernetes API key (Bearer Token) to authenticate with
"""
config.load_kube_config()
api_mode: str
api_host: str | None = None
api_key: str | None = None


def _setup_k8s_client_with_cluster_config() -> None:
def load_manual_k8s_config(configuration: client.Configuration, api_host: str | None, api_key: str | None) -> None:
"""
Setup Kubernetes API client with the pod service account. (requires PowerAPI to be running in a Kubernetes cluster)
Setup Kubernetes API client configuration manually.
This method only supports authentication by Bearer Token.
:param configuration: Kubernetes API client configuration
:param api_host: The Kubernetes API host
:param api_key: The Kubernetes API key (Bearer Token)
"""
config.load_incluster_config()
if not api_host:
raise ValueError('Kubernetes API host is not defined')

if not api_key:
raise ValueError('Kubernetes API key is not defined')

def _setup_k8s_client_with_manual_config(host: str, api_key: str) -> None:
"""
Setup Kubernetes API client with the user provided configuration. (Bearer Token)
:param host: Kubernetes API host url.
:param api_key: Kubernetes API token.
"""
configuration = client.Configuration()

configuration.host = host or 'http://localhost'
configuration.api_key["authorization"] = api_key

Configuration.set_default(configuration)
configuration.host = api_host
configuration.api_key['authorization'] = api_key
configuration.api_key_prefix['authorization'] = 'Bearer'


def load_k8s_api_client_configuration(api_mode: str, api_host: str, api_key: str) -> None:
def build_k8s_api_client_configuration(api_mode: str, api_host: str | None, api_key: str | None) -> client.Configuration:
"""
Setup Kubernetes API client according to the selected mode.
:param api_mode: API mode (manual, local, cluster)
:param api_host: API host to connect to
:param api_key: API key (Bearer Token) to authenticate with
Build a Kubernetes API client configuration.
:param api_mode: The Kubernetes API mode (manual, local, cluster)
:param api_host: The Kubernetes API host
:param api_key: The Kubernetes API key (Bearer Token)
:return: Kubernetes API client configuration
"""
if api_mode.casefold() == MANUAL_CONFIG_MODE:
_setup_k8s_client_with_manual_config(api_host, api_key)
return

if api_mode.casefold() == CLUSTER_CONFIG_MODE:
_setup_k8s_client_with_cluster_config()
return
configuration = client.Configuration()
match api_mode.casefold():
case 'local':
# Setup Kubernetes API client with a kube-config file. (from KUBECONFIG environment variable, or ~/.kube/config)
config.load_kube_config(client_configuration=configuration)
case 'cluster':
# Setup Kubernetes API client with the pod service account. (requires PowerAPI to be running in a pod)
config.load_incluster_config(client_configuration=configuration)
case 'manual':
load_manual_k8s_config(configuration, api_host, api_key)
case _:
raise ValueError(f'Invalid Kubernetes API mode: {api_mode}')

# load local configuration by default.
_setup_k8s_client_with_local_config()
return configuration


class K8sMonitorAgent(Process):
"""
Background monitoring agent that update the shared metadata cache from Kubernetes API events.
"""

def __init__(self, cache_manager: K8sMetadataCacheManager, api_mode: str, api_host: str, api_key: str, level_logger: int = logging.WARNING):
def __init__(self, cache_manager: K8sMetadataCacheManager, conf: K8sMonitorConfig, level_logger: int = logging.WARNING):
"""
:param K8sMetadataCacheManager cache_manager: Metadata cache manager
:param str api_mode: The Kubernetes API mode (manual, local, cluster)
:param str api_host: The Kubernetes API host
:param str api_key: The Kubernetes API key (Bearer Token)
:param conf: Configuration of the k8s processor actor
:param int level_logger: The logger level
"""
super().__init__(name='k8s-processor-monitor-agent')

#: (logging.Logger): Logger
self.logger = logging.getLogger(self.name)
self.logger.setLevel(level_logger)
formatter = logging.Formatter('%(asctime)s || %(levelname)s || ' + '%(process)d %(processName)s || %(message)s')
Expand All @@ -117,27 +121,25 @@ def __init__(self, cache_manager: K8sMetadataCacheManager, api_mode: str, api_ho

self.metadata_cache_manager = cache_manager

self.k8s_api = self._setup_k8s_api_client(api_mode, api_host, api_key)

self.stop_monitoring = False
self._api_config = build_k8s_api_client_configuration(conf.api_mode, conf.api_host, conf.api_key)
self._stop_monitoring = Event()

@staticmethod
def _setup_k8s_api_client(api_mode: str, api_host: str, api_key: str) -> client.CoreV1Api:
def build_k8s_api_client(api_config: client.Configuration) -> client.CoreV1Api:
"""
Setup Kubernetes API client.
:param api_mode: The Kubernetes API mode (manual, local, cluster)
:param api_host: The Kubernetes API host
:param api_key: The Kubernetes API key (Bearer Token)
Build a Kubernetes API client with the given configuration.
:param api_config: Kubernetes API configuration
:return: Kubernetes API client
"""
load_k8s_api_client_configuration(api_mode, api_host, api_key)
return client.CoreV1Api()
api_client = client.ApiClient(configuration=api_config)
return client.CoreV1Api(api_client)

def _setup_signal_handlers(self):
"""
Setup signal handlers for the current Process.
"""
def stop_monitor(_, __):
self.stop_monitoring = True
self._stop_monitoring.set()
sys.exit(0)

signal(SIGTERM, stop_monitor)
Expand All @@ -149,13 +151,13 @@ def run(self):
"""
self._setup_signal_handlers()

# Clearing the metadata cache before starting prevents having orphaned entries
# that will never be deleted because they no longer exist in the Kubernetes API.
self.metadata_cache_manager.clear_metadata_cache()
self.metadata_cache_manager.clear_metadata_cache() # Prevents orphaned cache entries.

while not self.stop_monitoring:
resource_id = self.fetch_list_all_pod_for_all_namespaces()
self.watch_list_pod_for_all_namespaces(resource_id)
api_client = self.build_k8s_api_client(self._api_config)
while not self._stop_monitoring.is_set():
resource_id = self.fetch_list_all_pod_for_all_namespaces(api_client)
self.watch_list_pod_for_all_namespaces(api_client, resource_id)
sleep(K8S_MONITOR_RETRY_DELAY_SECONDS)

@staticmethod
def get_containers_id_name_from_statuses(container_statuses: list[V1ContainerStatus]) -> dict[str, str]:
Expand Down Expand Up @@ -184,14 +186,15 @@ def build_metadata_cache_entries_from_pod(self, pod: V1Pod) -> list[K8sContainer
for container_id, container_name in self.get_containers_id_name_from_statuses(container_statuses).items()
]

def fetch_list_all_pod_for_all_namespaces(self) -> int | None:
def fetch_list_all_pod_for_all_namespaces(self, api_client: client.CoreV1Api) -> int | None:
"""
Fetch all pod for all namespaces and populate the metadata cache.
:param api_client: Kubernetes api client
:return: Resource version of the last fetched entry
"""
resource_version = None
try:
pods: V1PodList = self.k8s_api.list_pod_for_all_namespaces(watch=False)
pods: V1PodList = api_client.list_pod_for_all_namespaces(watch=False)
resource_version = pods.metadata.resource_version
for pod in pods.items:
for entry in self.build_metadata_cache_entries_from_pod(pod):
Expand All @@ -204,16 +207,16 @@ def fetch_list_all_pod_for_all_namespaces(self) -> int | None:

return resource_version

def watch_list_pod_for_all_namespaces(self, resource_version: int | None = None):
def watch_list_pod_for_all_namespaces(self, api_client: client.CoreV1Api, resource_version: int | None = None):
"""
Watch k8s pods events for all namespaces and update the local metadata cache accordingly.
:param api_client: Kubernetes API client
:param resource_version: Resource version from where the watcher begin
"""
try:
w = watch.Watch()
for event in w.stream(self.k8s_api.list_pod_for_all_namespaces, resource_version=resource_version):
for event in w.stream(api_client.list_pod_for_all_namespaces, resource_version=resource_version):
event_type = event["type"]

if event_type not in {ADDED_EVENT, MODIFIED_EVENT, DELETED_EVENT}:
logging.warning('Unexpected pod event: %s', event_type)
continue
Expand Down
6 changes: 3 additions & 3 deletions tests/unit/cli/test_generator_k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,9 @@ def test_preprocessor_generator_with_valid_k8s_config(k8s_processor_config):
assert isinstance(preprocessor, K8sPreProcessorActor)

expected_preprocessor_attributes = k8s_processor_config['pre-processor']['pytest-k8s-preprocessor']
assert preprocessor.config.api_mode == expected_preprocessor_attributes['api-mode']
assert preprocessor.config.api_key == expected_preprocessor_attributes['api-key']
assert preprocessor.config.api_host == expected_preprocessor_attributes['api-host']
assert preprocessor.monitor_config.api_mode == expected_preprocessor_attributes['api-mode']
assert preprocessor.monitor_config.api_key == expected_preprocessor_attributes['api-key']
assert preprocessor.monitor_config.api_host == expected_preprocessor_attributes['api-host']


@pytest.mark.parametrize('missing_arg', ['api-mode'])
Expand Down
6 changes: 4 additions & 2 deletions tests/unit/processor/pre/k8s/test_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@

pytest.importorskip('powerapi.processor.pre.k8s.actor') # The actor module requires external dependencies.

from powerapi.processor.pre.k8s.actor import K8sProcessorState, K8sProcessorConfig
from powerapi.processor.pre.k8s.actor import K8sProcessorState
from powerapi.processor.pre.k8s.monitor_agent import K8sMonitorConfig
from powerapi.processor.pre.k8s.handlers import K8sPreProcessorActorHWPCReportHandler
from powerapi.processor.pre.k8s.metadata_cache_manager import K8sContainerMetadata
from powerapi.report import HWPCReport
Expand All @@ -50,7 +51,8 @@ def _create_handler() -> tuple[K8sPreProcessorActorHWPCReportHandler, list[HWPCR
actor = Mock(name='processor-actor')
actor.target_actors = [Mock(name='target_actor_a'), Mock(name='target_actor_b')]

state = K8sProcessorState(actor, K8sProcessorConfig('manual'))
monitor_config = K8sMonitorConfig('manual', 'https://localhost:6443', 'pytest-token')
state = K8sProcessorState(actor, monitor_config)
state.metadata_cache_manager = Mock(name='metadata_cache_manager')

handler = K8sPreProcessorActorHWPCReportHandler(state)
Expand Down
Loading