diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 2d361f4..b9194bd 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -9,7 +9,7 @@ global_job_config: prologue: commands: - checkout - - sem-version python 3.9 + - sem-version python 3.13 - pip install tox - COMMIT_MESSAGE_PREFIX="[ci skip] Publish version" blocks: diff --git a/confluent/docker_utils/__init__.py b/confluent/docker_utils/__init__.py index 908e448..b18716c 100644 --- a/confluent/docker_utils/__init__.py +++ b/confluent/docker_utils/__init__.py @@ -1,91 +1,143 @@ import base64 import os import subprocess +from enum import StrEnum import boto3 import docker -from compose.config.config import ConfigDetails, ConfigFile, load -from compose.container import Container -from compose.project import Project -from compose.service import ImageType -from compose.cli.docker_client import docker_client -from compose.config.environment import Environment +from .compose import ( + ComposeConfig, ComposeProject, ComposeContainer, + create_docker_client, ContainerStatus, STATE_KEY, STATUS_KEY, + Separators, VOLUME_BIND_MODE, VOLUME_READ_WRITE_MODE +) + + +DOCKER_TESTING_LABEL = "io.confluent.docker.testing" +TRUE_VALUE = "true" + + +class ECRKeys(StrEnum): + """AWS ECR service keys.""" + ECR_SERVICE = "ecr" + AUTH_DATA = "authorizationData" + AUTH_TOKEN = "authorizationToken" + PROXY_ENDPOINT = "proxyEndpoint" + + +BASH_C = "bash -c" +SUCCESS_TEXT = "success" +SUCCESS_BYTES = b"success" + +BUSYBOX_IMAGE = "busybox" +HOST_NETWORK = "host" +TMP_VOLUME = "/tmp:/tmp" + + +DOCKER_PREFIX = "DOCKER_" +REGISTRY_SUFFIX = "REGISTRY" +TAG_SUFFIX = "TAG" +DEFAULT_TAG = "latest" +UPSTREAM_SCOPE = "UPSTREAM" +TEST_SCOPE = "TEST" +SCOPE_SEPARATOR = "_" + + +class ContainerConfigKeys(StrEnum): + """Container configuration keys.""" + IMAGE = "image" + COMMAND = "command" + LABELS = "labels" + HOST_CONFIG = "host_config" + NETWORK_MODE = "NetworkMode" + BINDS = "Binds" + DETACH = "detach" + NETWORK_MODE_KEY = "network_mode" + VOLUMES = "volumes" + + +UTF8_ENCODING = "utf-8" +IGNORE_DECODE_ERRORS = "ignore" +DOCKER_STREAM_KEY = "stream" def api_client(): - return docker.from_env().api + """Get Docker client compatible with both legacy and new usage.""" + return docker.from_env() def ecr_login(): # see docker/docker-py#1677 - ecr = boto3.client('ecr') + ecr = boto3.client(ECRKeys.ECR_SERVICE) login = ecr.get_authorization_token() - b64token = login['authorizationData'][0]['authorizationToken'].encode('utf-8') - username, password = base64.b64decode(b64token).decode('utf-8').split(':') - registry = login['authorizationData'][0]['proxyEndpoint'] + b64token = login[ECRKeys.AUTH_DATA][0][ECRKeys.AUTH_TOKEN].encode(UTF8_ENCODING) + username, password = base64.b64decode(b64token).decode(UTF8_ENCODING).split(Separators.COLON) + registry = login[ECRKeys.AUTH_DATA][0][ECRKeys.PROXY_ENDPOINT] client = docker.from_env() client.login(username, password, registry=registry) def build_image(image_name, dockerfile_dir): - print("Building image %s from %s" % (image_name, dockerfile_dir)) + print(f"Building image {image_name} from {dockerfile_dir}") client = api_client() - output = client.build(dockerfile_dir, rm=True, tag=image_name) - response = "".join([" %s" % (line,) for line in output]) + image, build_logs = client.images.build(path=dockerfile_dir, rm=True, tag=image_name) + response = "".join([f" {line.get(DOCKER_STREAM_KEY, '')}" for line in build_logs if DOCKER_STREAM_KEY in line]) print(response) def image_exists(image_name): client = api_client() - tags = [t for image in client.images() for t in image['RepoTags'] or []] - return image_name in tags + try: + client.images.get(image_name) + return True + except docker.errors.ImageNotFound: + return False def pull_image(image_name): client = api_client() if not image_exists(image_name): - client.pull(image_name) + client.images.pull(image_name) def run_docker_command(timeout=None, **kwargs): - pull_image(kwargs["image"]) + pull_image(kwargs[ContainerConfigKeys.IMAGE]) client = api_client() - kwargs["labels"] = {"io.confluent.docker.testing": "true"} + kwargs[ContainerConfigKeys.LABELS] = {DOCKER_TESTING_LABEL: TRUE_VALUE} container = TestContainer.create(client, **kwargs) container.start() container.wait(timeout) logs = container.logs() - print("Running command %s: %s" % (kwargs["command"], logs)) + print(f"Running command {kwargs[ContainerConfigKeys.COMMAND]}: {logs}") container.shutdown() return logs def path_exists_in_image(image, path): - print("Checking for %s in %s" % (path, image)) - cmd = "bash -c '[ ! -e %s ] || echo success' " % (path,) + print(f"Checking for {path} in {image}") + cmd = f"{BASH_C} '[ ! -e {path} ] || echo {SUCCESS_TEXT}' " output = run_docker_command(image=image, command=cmd) - return b"success" in output + return SUCCESS_BYTES in output def executable_exists_in_image(image, path): - print("Checking for %s in %s" % (path, image)) - cmd = "bash -c '[ ! -x %s ] || echo success' " % (path,) + print(f"Checking for {path} in {image}") + cmd = f"{BASH_C} '[ ! -x {path} ] || echo {SUCCESS_TEXT}' " output = run_docker_command(image=image, command=cmd) - return b"success" in output + return SUCCESS_BYTES in output def run_command_on_host(command): logs = run_docker_command( - image="busybox", + image=BUSYBOX_IMAGE, command=command, - host_config={'NetworkMode': 'host', 'Binds': ['/tmp:/tmp']}) - print("Running command %s: %s" % (command, logs)) + host_config={ContainerConfigKeys.NETWORK_MODE: HOST_NETWORK, ContainerConfigKeys.BINDS: [TMP_VOLUME]}) + print(f"Running command {command}: {logs}") return logs def run_cmd(command): if command.startswith('"'): - cmd = "bash -c %s" % command + cmd = f"{BASH_C} {command}" else: cmd = command @@ -107,98 +159,168 @@ def add_registry_and_tag(image, scope=""): """ if scope: - scope += "_" - - return "{0}{1}:{2}".format(os.environ.get("DOCKER_{0}REGISTRY".format(scope), ""), - image, - os.environ.get("DOCKER_{0}TAG".format(scope), "latest") - ) - - -class TestContainer(Container): - + scope += SCOPE_SEPARATOR + + registry = os.environ.get(f"{DOCKER_PREFIX}{scope}{REGISTRY_SUFFIX}", "") + tag = os.environ.get(f"{DOCKER_PREFIX}{scope}{TAG_SUFFIX}", DEFAULT_TAG) + return f"{registry}{image}:{tag}" + + +class TestContainer(ComposeContainer): + """Extended container class for testing purposes.""" + + def __init__(self, container): + super().__init__(container) + + @classmethod + def create(cls, client, **kwargs): + """Create a new container using Docker SDK.""" + # Extract Docker SDK compatible parameters + image = kwargs.get(ContainerConfigKeys.IMAGE) + command = kwargs.get(ContainerConfigKeys.COMMAND) + labels = kwargs.get(ContainerConfigKeys.LABELS, {}) + host_config = kwargs.get(ContainerConfigKeys.HOST_CONFIG, {}) + + container_config = { + ContainerConfigKeys.IMAGE: image, + ContainerConfigKeys.COMMAND: command, + ContainerConfigKeys.LABELS: labels, + ContainerConfigKeys.DETACH: True, + } + + if host_config: + if ContainerConfigKeys.NETWORK_MODE in host_config: + container_config[ContainerConfigKeys.NETWORK_MODE_KEY] = host_config[ContainerConfigKeys.NETWORK_MODE] + if ContainerConfigKeys.BINDS in host_config: + volumes = {} + for bind in host_config[ContainerConfigKeys.BINDS]: + host_path, container_path = bind.split(Separators.COLON) + volumes[host_path] = {VOLUME_BIND_MODE: container_path, 'mode': VOLUME_READ_WRITE_MODE} + container_config[ContainerConfigKeys.VOLUMES] = volumes + + docker_container = client.containers.create(**container_config) + return cls(docker_container) + + def start(self): + """Start the container.""" + self.container.start() + def state(self): - return self.inspect_container["State"] + """Get container state information.""" + self.container.reload() + return self.container.attrs[STATE_KEY] def status(self): - return self.state()["Status"] + """Get container status.""" + return self.state()[STATUS_KEY] def shutdown(self): + """Stop and remove the container.""" self.stop() self.remove() def execute(self, command): - eid = self.create_exec(command) - return self.start_exec(eid) + """Execute a command in the container.""" + result = self.container.exec_run(command) + return result.output def wait(self, timeout): - return self.client.wait(self.id, timeout) + """Wait for the container to stop.""" + return self.container.wait(timeout=timeout) class TestCluster(): + """Test cluster management using modern Docker SDK.""" def __init__(self, name, working_dir, config_file): - config_file_path = os.path.join(working_dir, config_file) - cfg_file = ConfigFile.from_filename(config_file_path) - c = ConfigDetails(working_dir, [cfg_file],) - self.cd = load(c) self.name = name + self.config = ComposeConfig(working_dir, config_file) + self._project = None def get_project(self): - # Dont reuse the client to fix this bug : https://github.com/docker/compose/issues/1275 - client = docker_client(Environment()) - project = Project.from_config(self.name, self.cd, client) - return project + """Get the compose project, creating a new client each time to avoid issues.""" + # Create a new client each time to avoid reuse issues + client = create_docker_client() + self._project = ComposeProject(self.name, self.config, client) + return self._project def start(self): + """Start all services in the cluster.""" self.shutdown() self.get_project().up() def is_running(self): - state = [container.is_running for container in self.get_project().containers()] - return all(state) and len(state) > 0 + """Check if all services in the cluster are running.""" + containers = self.get_project().containers() + if not containers: + return False + return all(container.is_running for container in containers) def is_service_running(self, service_name): - return self.get_container(service_name).is_running + """Check if a specific service is running.""" + try: + return self.get_container(service_name).is_running + except RuntimeError: + return False def shutdown(self): + """Shutdown all services in the cluster.""" project = self.get_project() - project.down(ImageType.none, True, True) + project.down(remove_volumes=True, remove_orphans=True) project.remove_stopped() def get_container(self, service_name, stopped=False): + """Get a container for a specific service.""" + if stopped: + containers = self.get_project().containers([service_name], stopped=True) + if containers: + return containers[0] + raise RuntimeError(f"No container found for service '{service_name}'") return self.get_project().get_service(service_name).get_container() def exit_code(self, service_name): + """Get the exit code of a service container.""" containers = self.get_project().containers([service_name], stopped=True) - return containers[0].exit_code + if containers: + return containers[0].exit_code + return None def wait(self, service_name, timeout): - container = self.get_project().containers([service_name], stopped=True) - if container[0].is_running: - return self.get_project().client.wait(container[0].id, timeout) + """Wait for a service container to stop.""" + containers = self.get_project().containers([service_name], stopped=True) + if containers and containers[0].is_running: + return containers[0].wait(timeout) def run_command_on_service(self, service_name, command): + """Run a command on a specific service container.""" return self.run_command(command, self.get_container(service_name)) def service_logs(self, service_name, stopped=False): + """Get logs from a service container.""" if stopped: containers = self.get_project().containers([service_name], stopped=True) - print(containers[0].logs()) - return containers[0].logs() + if containers: + logs = containers[0].logs() + print(logs) + return logs + return b'' else: return self.get_container(service_name).logs() def run_command(self, command, container): - print("Running %s on %s :" % (command, container)) - eid = container.create_exec(command) - output = container.start_exec(eid) - print("\n%s " % output) + """Run a command on a container.""" + print(f"Running {command} on {container.name} :") + result = container.container.exec_run(command) + output = result.output + if isinstance(output, bytes): + print(f"\n{output.decode(UTF8_ENCODING, errors=IGNORE_DECODE_ERRORS)} ") + else: + print(f"\n{output} ") return output def run_command_on_all(self, command): + """Run a command on all containers in the cluster.""" results = {} for container in self.get_project().containers(): results[container.name_without_project] = self.run_command(command, container) - return results diff --git a/confluent/docker_utils/compose.py b/confluent/docker_utils/compose.py new file mode 100644 index 0000000..3f97857 --- /dev/null +++ b/confluent/docker_utils/compose.py @@ -0,0 +1,326 @@ +#!/usr/bin/env python +""" +Modern replacement for docker-compose Python library using Docker SDK directly. +Compatible with Python 3.10+ and maintains the same TestCluster interface. +""" + +import os +import yaml +import docker +from typing import Dict, List, Optional, Any +import time +from enum import Enum, StrEnum + + +class ContainerStatus(StrEnum): + """Container status constants.""" + RUNNING = "running" + EXITED = "exited" + + +class DockerComposeLabels(StrEnum): + """Docker Compose label constants.""" + PROJECT = "com.docker.compose.project" + SERVICE = "com.docker.compose.service" + + +class ComposeConfigKeys(StrEnum): + """Docker Compose configuration keys.""" + VERSION = "version" + SERVICES = "services" + IMAGE = "image" + BUILD = "build" + COMMAND = "command" + ENVIRONMENT = "environment" + PORTS = "ports" + VOLUMES = "volumes" + WORKING_DIR = "working_dir" + + +STATE_KEY = "State" +EXIT_CODE_KEY = "ExitCode" +ID_KEY = "Id" +STATUS_KEY = "Status" + +FILE_READ_MODE = "r" +VOLUME_READ_WRITE_MODE = "rw" +VOLUME_BIND_MODE = "bind" + +CURRENT_DIR_PREFIX = "./" + + +class Separators(StrEnum): + """Common string separators.""" + UNDERSCORE = "_" + COLON = ":" + EQUALS = "=" + + +class Defaults(StrEnum): + """Default configuration values.""" + COMPOSE_VERSION = "3" + CONTAINER_SUFFIX = "_1" + + +class ComposeConfig: + """Handles docker-compose.yml parsing and configuration management.""" + + def __init__(self, working_dir: str, config_file: str): + self.working_dir = working_dir + self.config_file_path = os.path.join(working_dir, config_file) + self.config = self._load_config() + + def _load_config(self) -> Dict[str, Any]: + """Load and parse docker-compose.yml file.""" + with open(self.config_file_path, FILE_READ_MODE) as f: + config = yaml.safe_load(f) + + if ComposeConfigKeys.VERSION not in config: + config[ComposeConfigKeys.VERSION] = Defaults.COMPOSE_VERSION + + if ComposeConfigKeys.SERVICES not in config: + raise ValueError("docker-compose.yml must contain 'services' section") + + return config + + def get_services(self) -> Dict[str, Dict[str, Any]]: + """Get all service definitions.""" + return self.config.get(ComposeConfigKeys.SERVICES, {}) + + def get_service(self, service_name: str) -> Dict[str, Any]: + """Get a specific service definition.""" + services = self.get_services() + if service_name not in services: + raise ValueError(f"Service '{service_name}' not found in compose file") + return services[service_name] + + +class ComposeContainer: + """Wrapper around Docker SDK container to provide compose-like interface.""" + + def __init__(self, container: docker.models.containers.Container): + self.container = container + self._service_name = None + + @property + def id(self) -> str: + return self.container.id + + @property + def name(self) -> str: + return self.container.name + + @property + def name_without_project(self) -> str: + """Extract service name from container name.""" + if self._service_name: + return self._service_name + # Container names usually follow pattern: projectname_servicename_1 + parts = self.name.split(Separators.UNDERSCORE) + if len(parts) >= 2: + return parts[1] + return self.name + + @property + def is_running(self) -> bool: + """Check if container is running.""" + self.container.reload() + return self.container.status == ContainerStatus.RUNNING + + @property + def exit_code(self) -> Optional[int]: + """Get container exit code.""" + self.container.reload() + if self.container.status == ContainerStatus.EXITED: + return self.container.attrs[STATE_KEY][EXIT_CODE_KEY] + return None + + def create_exec(self, command: str) -> str: + """Create an exec instance and return its ID.""" + exec_create_result = self.container.client.api.exec_create(self.container.id, command) + return exec_create_result[ID_KEY] + + def start_exec(self, exec_id: str) -> bytes: + """Start an exec instance by ID and return output.""" + output = self.container.client.api.exec_start(exec_id) + return output + + def logs(self) -> bytes: + """Get container logs.""" + return self.container.logs() + + def stop(self): + """Stop the container.""" + self.container.stop() + + def remove(self): + """Remove the container.""" + self.container.remove() + + +class ComposeProject: + """ + Replacement for docker-compose Project class using Docker SDK. + Provides similar interface for managing multi-container applications. + """ + + def __init__(self, name: str, config: ComposeConfig, client: docker.DockerClient): + self.name = name + self.config = config + self.client = client + self._containers = {} + + def up(self, services: Optional[List[str]] = None): + """Start all services (equivalent to docker-compose up).""" + services_to_start = services or list(self.config.get_services().keys()) + + for service_name in services_to_start: + self._start_service(service_name) + + def down(self, remove_images=None, remove_volumes=False, remove_orphans=False): + """Stop and remove all containers (equivalent to docker-compose down).""" + containers = self.containers() + + for container in containers: + try: + container.stop() + except Exception as e: + print(f"Error stopping container {container.name}: {e}") + + for container in containers: + try: + container.remove() + except Exception as e: + print(f"Error removing container {container.name}: {e}") + + def containers(self, service_names: Optional[List[str]] = None, stopped: bool = False) -> List[ComposeContainer]: + """Get containers for the project.""" + filters = { + 'label': f'{DockerComposeLabels.PROJECT}={self.name}' + } + + if not stopped: + filters['status'] = ContainerStatus.RUNNING + + docker_containers = self.client.containers.list(all=stopped, filters=filters) + compose_containers = [ComposeContainer(c) for c in docker_containers] + + if service_names: + filtered = [] + for container in compose_containers: + service_label = container.container.labels.get(DockerComposeLabels.SERVICE) + if service_label in service_names: + filtered.append(container) + return filtered + + return compose_containers + + def get_service(self, service_name: str): + """Get a service object.""" + return ComposeService(service_name, self) + + def remove_stopped(self): + """Remove stopped containers.""" + stopped_containers = self.containers(stopped=True) + for container in stopped_containers: + if not container.is_running: + try: + container.remove() + except Exception as e: + print(f"Error removing stopped container {container.name}: {e}") + + def _start_service(self, service_name: str): + """Start a specific service.""" + service_config = self.config.get_service(service_name) + + container_config = self._build_container_config(service_name, service_config) + + container_name = f"{self.name}{Separators.UNDERSCORE}{service_name}{Defaults.CONTAINER_SUFFIX}" + try: + existing = self.client.containers.get(container_name) + if existing.status != ContainerStatus.RUNNING: + existing.start() + return ComposeContainer(existing) + except docker.errors.NotFound: + pass + + container = self.client.containers.run( + name=container_name, + detach=True, + labels={ + DockerComposeLabels.PROJECT: self.name, + DockerComposeLabels.SERVICE: service_name, + }, + **container_config + ) + + return ComposeContainer(container) + + def _build_container_config(self, service_name: str, service_config: Dict[str, Any]) -> Dict[str, Any]: + """Build Docker SDK container configuration from compose service config.""" + config = {} + + if ComposeConfigKeys.IMAGE in service_config: + config[ComposeConfigKeys.IMAGE] = service_config[ComposeConfigKeys.IMAGE] + elif ComposeConfigKeys.BUILD in service_config: + raise NotImplementedError("Building images not implemented in this example") + + if ComposeConfigKeys.COMMAND in service_config: + config[ComposeConfigKeys.COMMAND] = service_config[ComposeConfigKeys.COMMAND] + + if ComposeConfigKeys.ENVIRONMENT in service_config: + env = service_config[ComposeConfigKeys.ENVIRONMENT] + if isinstance(env, list): + env_dict = {} + for item in env: + if Separators.EQUALS in item: + key, value = item.split(Separators.EQUALS, 1) + env_dict[key] = value + config[ComposeConfigKeys.ENVIRONMENT] = env_dict + else: + config[ComposeConfigKeys.ENVIRONMENT] = env + + if ComposeConfigKeys.PORTS in service_config: + ports = {} + for port_mapping in service_config[ComposeConfigKeys.PORTS]: + if Separators.COLON in str(port_mapping): + host_port, container_port = str(port_mapping).split(Separators.COLON, 1) + ports[container_port] = host_port + else: + ports[str(port_mapping)] = None + config[ComposeConfigKeys.PORTS] = ports + + if ComposeConfigKeys.VOLUMES in service_config: + volumes = {} + for volume in service_config[ComposeConfigKeys.VOLUMES]: + if Separators.COLON in volume: + host_path, container_path = volume.split(Separators.COLON, 1) + if host_path.startswith(CURRENT_DIR_PREFIX): + host_path = os.path.join(self.config.working_dir, host_path[2:]) + volumes[host_path] = {VOLUME_BIND_MODE: container_path, 'mode': VOLUME_READ_WRITE_MODE} + config[ComposeConfigKeys.VOLUMES] = volumes + + if ComposeConfigKeys.WORKING_DIR in service_config: + config[ComposeConfigKeys.WORKING_DIR] = service_config[ComposeConfigKeys.WORKING_DIR] + + return config + + +class ComposeService: + """Represents a service in the compose project.""" + + def __init__(self, name: str, project: ComposeProject): + self.name = name + self.project = project + + def get_container(self) -> ComposeContainer: + """Get the container for this service.""" + containers = self.project.containers([self.name]) + if not containers: + raise RuntimeError(f"No running container found for service '{self.name}'") + return containers[0] + + +def create_docker_client() -> docker.DockerClient: + """Create a Docker client similar to the old compose.cli.docker_client.""" + return docker.from_env() diff --git a/requirements.txt b/requirements.txt index 80f316e..bdb7fbe 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ boto3==1.36.6 docker==7.1.0 -docker-compose==1.29.2 Jinja2==3.1.6 -requests==2.32.4 - +PyYAML==6.0.2 +requests==2.32.4 \ No newline at end of file