From ed28bf72bb763f0fff0be3dd811a2a37ff6f6281 Mon Sep 17 00:00:00 2001 From: Petya Slavova Date: Tue, 2 Dec 2025 16:26:18 +0200 Subject: [PATCH] Fixing RedisCluster client instantiation. Ruff autoformatting applied. --- Makefile | 34 +-- README.md | 2 +- cli.py | 594 ++++++++++++++++++++++++++++++++++------------- config.py | 135 ++++++----- redis_client.py | 302 ++++++++++++++---------- requirements.txt | 4 +- 6 files changed, 712 insertions(+), 359 deletions(-) diff --git a/Makefile b/Makefile index 2ce4944..6c3ef4f 100644 --- a/Makefile +++ b/Makefile @@ -1,12 +1,12 @@ # Redis Python Test App - Makefile -.PHONY: help install-python39 install-deps-venv test test-connection build clean +.PHONY: help install-python310 install-deps-venv test test-connection build clean # Default target help: @echo "Redis Python Test App - Available Commands:" @echo "" @echo "🚀 Development Commands:" - @echo " make install-python39 - Install Python 3.9 on Ubuntu/Debian systems" + @echo " make install-python310 - Install Python 3.10 on Ubuntu/Debian systems" @echo " make install-deps-venv - Create virtual environment and install dependencies" @echo " make test - Run basic test (60 seconds)" @echo " make test-connection - Test Redis connection" @@ -18,8 +18,8 @@ help: @echo " make clean - Clean up Python cache and virtual environment" @echo "" @echo "📋 Prerequisites:" - @echo " • Python 3.9+ installed (required for redis==7.0.0b1)" - @echo " • Run 'make install-python39' to install Python 3.9 on Ubuntu/Debian" + @echo " • Python 3.10+ installed (required for redis>=7.1.0)" + @echo " • Run 'make install-python310' to install Python 3.10 on Ubuntu/Debian" @echo " • Run 'make install-deps-venv' to set up virtual environment" @echo " • Redis Metrics Stack must be running (separate repository)" @echo " • Redis accessible at localhost:6379" @@ -33,27 +33,27 @@ IMAGE_TAG ?= latest # Development Commands #============================================================================== -install-python39: ## Install Python 3.9 on Ubuntu/Debian systems - @echo "🐍 Installing Python 3.9..." - @if command -v python3.9 >/dev/null 2>&1; then \ - echo "✓ Python 3.9 already installed"; \ - python3.9 --version; \ +install-python310: ## Install Python 3.10 on Ubuntu/Debian systems + @echo "🐍 Installing Python 3.10..." + @if command -v python3.10 >/dev/null 2>&1; then \ + echo "✓ Python 3.10 already installed"; \ + python3.10 --version; \ else \ - echo "📦 Installing Python 3.9 and required packages..."; \ + echo "📦 Installing Python 3.10 and required packages..."; \ sudo apt update; \ - sudo apt install -y python3.9 python3.9-venv python3.9-dev python3.9-distutils; \ - echo "✅ Python 3.9 installation complete"; \ - python3.9 --version; \ + sudo apt install -y python3.10 python3.10-venv python3.10-dev python3.10-distutils; \ + echo "✅ Python 3.10 installation complete"; \ + python3.10 --version; \ fi install-deps-venv: ## Create virtual environment and install dependencies @echo "📦 Setting up Python virtual environment..." @if [ ! -d "venv" ]; then \ - echo "🔧 Creating virtual environment with Python 3.9..."; \ - if command -v python3.9 >/dev/null 2>&1; then \ - python3.9 -m venv venv; \ + echo "🔧 Creating virtual environment with Python 3.10..."; \ + if command -v python3.10 >/dev/null 2>&1; then \ + python3.10 -m venv venv; \ else \ - echo "❌ Python 3.9 not found. Run 'make install-python39' first."; \ + echo "❌ Python 3.10 not found. Run 'make install-python310' first."; \ exit 1; \ fi; \ else \ diff --git a/README.md b/README.md index c6f9bcb..758d4df 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ A high-performance Redis load testing application built with Python and redis-py ### Prerequisites -- **Python 3.8+** with pip +- **Python 3.10+** with pip - **Redis server** running (localhost:6379 by default) - **Redis Metrics Stack** (optional, for observability - separate repository) diff --git a/cli.py b/cli.py index 0ea0b1a..d7c32b6 100644 --- a/cli.py +++ b/cli.py @@ -1,32 +1,60 @@ """ Command-line interface for Redis load testing application. """ + +from typing import Any import click import sys import json import os from dotenv import load_dotenv -from config import RunnerConfig, TestConfig, RedisConnectionConfig, WorkloadConfig, WorkloadProfiles, get_redis_version +from config import ( + RunnerConfig, + TestConfig, + RedisConnectionConfig, + WorkloadProfiles, + get_redis_version, +) from test_runner import TestRunner # Load environment variables from .env file load_dotenv() -def get_env_or_default(env_var: str, default_value, value_type=str): +class BoolOrAutoType(click.ParamType): + """Custom Click type that accepts bool or literal 'auto' string.""" + + name = "bool|auto" + + def convert(self, value, param, ctx): + if isinstance(value, str): + lower_value = value.lower() + if lower_value == "auto": + return "auto" + if lower_value in ("true", "1", "yes", "on"): + return True + if lower_value in ("false", "0", "no", "off"): + return False + self.fail(f"{value} is not a valid bool or 'auto'", param, ctx) + return value + + +def get_env_or_default(env_var: str, default_value: Any, value_type: type = str): """Get environment variable with type conversion and default fallback.""" env_value = os.getenv(env_var) if env_value is None: return default_value - + try: - if value_type == bool: - return env_value.lower() in ('true', '1', 'yes', 'on') - elif value_type == int: + if value_type is bool: + return env_value.lower() in ("true", "1", "yes", "on") + elif value_type is int: return int(env_value) - elif value_type == float: + elif value_type is float: return float(env_value) + elif value_type is BoolOrAutoType: + return BoolOrAutoType().convert(env_value, None, None) else: return env_value except (ValueError, TypeError): @@ -49,14 +77,14 @@ def list_profiles(): workload = WorkloadProfiles.get_profile(profile) operations = workload.get_option("operations", [workload.type]) if isinstance(operations, list): - ops_str = ', '.join(operations) + ops_str = ", ".join(operations) else: ops_str = workload.type click.echo(f" {profile}: {ops_str}") @cli.command() -@click.argument('profile_name', type=click.Choice(WorkloadProfiles.list_profiles())) +@click.argument("profile_name", type=click.Choice(WorkloadProfiles.list_profiles())) def describe_profile(profile_name): """Describe a specific workload profile.""" workload = WorkloadProfiles.get_profile(profile_name) @@ -112,107 +140,352 @@ def describe_profile(profile_name): # ============================================================================ # Redis Connection Parameters # ============================================================================ -@click.option('--host', default=lambda: get_env_or_default('REDIS_HOST', 'localhost'), help='Redis host') -@click.option('--port', type=int, default=lambda: get_env_or_default('REDIS_PORT', 6379, int), help='Redis port') -@click.option('--password', default=lambda: get_env_or_default('REDIS_PASSWORD', None), help='Redis password') -@click.option('--db', type=int, default=lambda: get_env_or_default('REDIS_DB', 0, int), help='Redis database number') -@click.option('--cluster-enabled', type=bool, default=lambda: get_env_or_default('REDIS_CLUSTER', False, bool), help='Use Redis Cluster mode') -@click.option('--cluster-nodes', default=lambda: get_env_or_default('REDIS_CLUSTER_NODES', None), help='Comma-separated list of cluster nodes (host:port)') -@click.option('--ssl-enabled', type=bool, default=lambda: get_env_or_default('REDIS_SSL_ENABLED', False, bool), help='Use SSL/TLS connection') -@click.option('--ssl-keyfile', default=lambda: get_env_or_default('REDIS_SSL_KEYFILE', None), help='Path to client private key file') -@click.option('--ssl-certfile', default=lambda: get_env_or_default('REDIS_SSL_CERTFILE', None), help='Path to client certificate file') -@click.option('--ssl-cert-reqs', default=lambda: get_env_or_default('REDIS_SSL_CERT_REQS', 'required'), type=click.Choice(['none', 'optional', 'required']), help='SSL certificate requirements') -@click.option('--ssl-ca-certs', default=lambda: get_env_or_default('REDIS_SSL_CA_CERTS', None), help='Path to CA certificates file') -@click.option('--ssl-ca-path', default=lambda: get_env_or_default('REDIS_SSL_CA_PATH', None), help='Path to directory containing CA certificates') -@click.option('--ssl-ca-data', default=lambda: get_env_or_default('REDIS_SSL_CA_DATA', None), help='CA certificate data as string') -@click.option('--ssl-check-hostname', type=bool, default=lambda: get_env_or_default('REDIS_SSL_CHECK_HOSTNAME', True, bool), help='Check SSL hostname') -@click.option('--ssl-password', default=lambda: get_env_or_default('REDIS_SSL_PASSWORD', None), help='Password for SSL private key') -@click.option('--ssl-min-version', default=lambda: get_env_or_default('REDIS_SSL_MIN_VERSION', 'TLSv1_2'), help='Minimum SSL/TLS version (TLSv1, TLSv1_1, TLSv1_2, TLSv1_3 or 1.0, 1.1, 1.2, 1.3). Default: TLSv1_2 for Redis Enterprise compatibility') -@click.option('--ssl-ciphers', default=lambda: get_env_or_default('REDIS_SSL_CIPHERS', None), help='SSL cipher suite') -@click.option('--socket-timeout', type=float, default=lambda: get_env_or_default('REDIS_SOCKET_TIMEOUT', None), help='Socket timeout in seconds') -@click.option('--socket-connect-timeout', type=float, default=lambda: get_env_or_default('REDIS_SOCKET_CONNECT_TIMEOUT', None), help='Socket connect timeout in seconds') -@click.option('--max-connections', type=int, default=lambda: get_env_or_default('REDIS_MAX_CONNECTIONS', 50, int), help='Maximum connections per client') -@click.option('--client-retry-attempts', type=int, default=lambda: get_env_or_default('REDIS_CLIENT_RETRY_ATTEMPTS', 3, int), help='Number of client-level retry attempts for network/connection issues (uses redis-py Retry class)') -@click.option('--maintenance-notifications-enabled', type=bool, default=lambda: get_env_or_default('REDIS_MAINT_NOTIFICATIONS_ENABLED', True, bool), help='Server maintenance events (hitless upgrades push notifications)') -@click.option('--maintenance-relaxed-timeout', type=float, default=lambda: get_env_or_default('REDIS_MAINT_RELAXED_TIMEOUT', None), help='Relaxedimeout during maintenance events') -@click.option('--protocol',type=int, default=lambda: get_env_or_default('REDIS_PROTOCOL', 3, int), help='RESP Version (2 or 3)') - +@click.option( + "--host", + default=lambda: get_env_or_default("REDIS_HOST", "localhost"), + help="Redis host", +) +@click.option( + "--port", + type=int, + default=lambda: get_env_or_default("REDIS_PORT", 6379, int), + help="Redis port", +) +@click.option( + "--password", + default=lambda: get_env_or_default("REDIS_PASSWORD", None), + help="Redis password", +) +@click.option( + "--db", + type=int, + default=lambda: get_env_or_default("REDIS_DB", 0, int), + help="Redis database number", +) +@click.option( + "--cluster-enabled", + type=bool, + default=lambda: get_env_or_default("REDIS_CLUSTER", False, bool), + help="Use Redis Cluster mode", +) +@click.option( + "--cluster-nodes", + default=lambda: get_env_or_default("REDIS_CLUSTER_NODES", None), + help="Comma-separated list of cluster nodes (host:port)", +) +@click.option( + "--ssl-enabled", + type=bool, + default=lambda: get_env_or_default("REDIS_SSL_ENABLED", False, bool), + help="Use SSL/TLS connection", +) +@click.option( + "--ssl-keyfile", + default=lambda: get_env_or_default("REDIS_SSL_KEYFILE", None), + help="Path to client private key file", +) +@click.option( + "--ssl-certfile", + default=lambda: get_env_or_default("REDIS_SSL_CERTFILE", None), + help="Path to client certificate file", +) +@click.option( + "--ssl-cert-reqs", + default=lambda: get_env_or_default("REDIS_SSL_CERT_REQS", "required"), + type=click.Choice(["none", "optional", "required"]), + help="SSL certificate requirements", +) +@click.option( + "--ssl-ca-certs", + default=lambda: get_env_or_default("REDIS_SSL_CA_CERTS", None), + help="Path to CA certificates file", +) +@click.option( + "--ssl-ca-path", + default=lambda: get_env_or_default("REDIS_SSL_CA_PATH", None), + help="Path to directory containing CA certificates", +) +@click.option( + "--ssl-ca-data", + default=lambda: get_env_or_default("REDIS_SSL_CA_DATA", None), + help="CA certificate data as string", +) +@click.option( + "--ssl-check-hostname", + type=bool, + default=lambda: get_env_or_default("REDIS_SSL_CHECK_HOSTNAME", True, bool), + help="Check SSL hostname", +) +@click.option( + "--ssl-password", + default=lambda: get_env_or_default("REDIS_SSL_PASSWORD", None), + help="Password for SSL private key", +) +@click.option( + "--ssl-min-version", + default=lambda: get_env_or_default("REDIS_SSL_MIN_VERSION", "TLSv1_2"), + help="Minimum SSL/TLS version (TLSv1, TLSv1_1, TLSv1_2, TLSv1_3 or 1.0, 1.1, 1.2, 1.3). Default: TLSv1_2 for Redis Enterprise compatibility", +) +@click.option( + "--ssl-ciphers", + default=lambda: get_env_or_default("REDIS_SSL_CIPHERS", None), + help="SSL cipher suite", +) +@click.option( + "--socket-timeout", + type=float, + default=lambda: get_env_or_default("REDIS_SOCKET_TIMEOUT", None), + help="Socket timeout in seconds", +) +@click.option( + "--socket-connect-timeout", + type=float, + default=lambda: get_env_or_default("REDIS_SOCKET_CONNECT_TIMEOUT", None), + help="Socket connect timeout in seconds", +) +@click.option( + "--max-connections", + type=int, + default=lambda: get_env_or_default("REDIS_MAX_CONNECTIONS", 50, int), + help="Maximum connections per client", +) +@click.option( + "--client-retry-attempts", + type=int, + default=lambda: get_env_or_default("REDIS_CLIENT_RETRY_ATTEMPTS", 3, int), + help="Number of client-level retry attempts for network/connection issues (uses redis-py Retry class)", +) +@click.option( + "--maintenance-notifications-enabled", + type=BoolOrAutoType(), + default=lambda: get_env_or_default( + "REDIS_MAINT_NOTIFICATIONS_ENABLED", True, BoolOrAutoType + ), + help="Server maintenance events (hitless upgrades push notifications). Accepts: true, false, or 'auto'", +) +@click.option( + "--maintenance-relaxed-timeout", + type=float, + default=lambda: get_env_or_default("REDIS_MAINT_RELAXED_TIMEOUT", None), + help="Relaxedimeout during maintenance events", +) +@click.option( + "--protocol", + type=int, + default=lambda: get_env_or_default("REDIS_PROTOCOL", 3, int), + help="RESP Version (2 or 3)", +) # ============================================================================ # Test Configuration Parameters # ============================================================================ -@click.option('--duration', type=int, default=lambda: get_env_or_default('TEST_DURATION', None, int), help='Test duration in seconds (unlimited if not specified)') -@click.option('--target-ops-per-second', type=int, default=lambda: get_env_or_default('TEST_TARGET_OPS_PER_SECOND', None, int), help='Target operations per second') -@click.option('--clients', type=int, default=lambda: get_env_or_default('TEST_CLIENT_INSTANCES', 4, int), help='Number of Redis clients') -@click.option('--threads-per-client', type=int, default=lambda: get_env_or_default('TEST_THREADS_PER_CLIENT', 10, int), help='Number of worker threads per Redis client') - +@click.option( + "--duration", + type=int, + default=lambda: get_env_or_default("TEST_DURATION", None, int), + help="Test duration in seconds (unlimited if not specified)", +) +@click.option( + "--target-ops-per-second", + type=int, + default=lambda: get_env_or_default("TEST_TARGET_OPS_PER_SECOND", None, int), + help="Target operations per second", +) +@click.option( + "--clients", + type=int, + default=lambda: get_env_or_default("TEST_CLIENT_INSTANCES", 4, int), + help="Number of Redis clients", +) +@click.option( + "--threads-per-client", + type=int, + default=lambda: get_env_or_default("TEST_THREADS_PER_CLIENT", 10, int), + help="Number of worker threads per Redis client", +) # ============================================================================ # Workload Configuration Parameters # ============================================================================ -@click.option('--workload-profile', type=click.Choice(WorkloadProfiles.list_profiles()), default=lambda: get_env_or_default('TEST_WORKLOAD_PROFILE', None), help='Pre-defined workload profile') -@click.option('--operations', default=lambda: get_env_or_default('TEST_OPERATIONS', None), help='Comma-separated list of Redis operations') -@click.option('--operation-weights', default=lambda: get_env_or_default('TEST_OPERATION_WEIGHTS', None), help='JSON string of operation weights (e.g., {"SET": 0.4, "GET": 0.6})') -@click.option('--key-prefix', default=lambda: get_env_or_default('TEST_KEY_PREFIX', 'test_key'), help='Prefix for generated keys') -@click.option('--key-range', type=int, default=lambda: get_env_or_default('TEST_KEY_RANGE', 10000, int), help='Range of key IDs to use') -@click.option('--read-write-ratio', type=float, default=lambda: get_env_or_default('TEST_READ_WRITE_RATIO', 0.7, float), help='Ratio of read operations (0.0-1.0)') -@click.option('--value-size', type=int, default=lambda: get_env_or_default('TEST_VALUE_SIZE', None, int), help='Fixed value size in bytes (overrides min/max)') -@click.option('--value-size-min', type=int, default=lambda: get_env_or_default('TEST_VALUE_SIZE_MIN', 100, int), help='Minimum value size in bytes') -@click.option('--value-size-max', type=int, default=lambda: get_env_or_default('TEST_VALUE_SIZE_MAX', 1000, int), help='Maximum value size in bytes') - +@click.option( + "--workload-profile", + type=click.Choice(WorkloadProfiles.list_profiles()), + default=lambda: get_env_or_default("TEST_WORKLOAD_PROFILE", None), + help="Pre-defined workload profile", +) +@click.option( + "--operations", + default=lambda: get_env_or_default("TEST_OPERATIONS", None), + help="Comma-separated list of Redis operations", +) +@click.option( + "--operation-weights", + default=lambda: get_env_or_default("TEST_OPERATION_WEIGHTS", None), + help='JSON string of operation weights (e.g., {"SET": 0.4, "GET": 0.6})', +) +@click.option( + "--key-prefix", + default=lambda: get_env_or_default("TEST_KEY_PREFIX", "test_key"), + help="Prefix for generated keys", +) +@click.option( + "--key-range", + type=int, + default=lambda: get_env_or_default("TEST_KEY_RANGE", 10000, int), + help="Range of key IDs to use", +) +@click.option( + "--read-write-ratio", + type=float, + default=lambda: get_env_or_default("TEST_READ_WRITE_RATIO", 0.7, float), + help="Ratio of read operations (0.0-1.0)", +) +@click.option( + "--value-size", + type=int, + default=lambda: get_env_or_default("TEST_VALUE_SIZE", None, int), + help="Fixed value size in bytes (overrides min/max)", +) +@click.option( + "--value-size-min", + type=int, + default=lambda: get_env_or_default("TEST_VALUE_SIZE_MIN", 100, int), + help="Minimum value size in bytes", +) +@click.option( + "--value-size-max", + type=int, + default=lambda: get_env_or_default("TEST_VALUE_SIZE_MAX", 1000, int), + help="Maximum value size in bytes", +) # ============================================================================ # Pipeline & Advanced Parameters # ============================================================================ -@click.option('--use-pipeline', type=bool, default=lambda: get_env_or_default('TEST_USE_PIPELINE', False, bool), help='Use Redis pipelining') -@click.option('--pipeline-size', type=int, default=lambda: get_env_or_default('TEST_PIPELINE_SIZE', 10, int), help='Number of operations per pipeline') -@click.option('--async-mode', type=bool, default=lambda: get_env_or_default('TEST_ASYNC_MODE', False, bool), help='Use asynchronous operations') -@click.option('--transaction-size', type=int, default=lambda: get_env_or_default('TEST_TRANSACTION_SIZE', 5, int), help='Number of operations per transaction') -@click.option('--pubsub-channels', default=lambda: get_env_or_default('TEST_PUBSUB_CHANNELS', None), help='Comma-separated list of pub/sub channels') - +@click.option( + "--use-pipeline", + type=bool, + default=lambda: get_env_or_default("TEST_USE_PIPELINE", False, bool), + help="Use Redis pipelining", +) +@click.option( + "--pipeline-size", + type=int, + default=lambda: get_env_or_default("TEST_PIPELINE_SIZE", 10, int), + help="Number of operations per pipeline", +) +@click.option( + "--async-mode", + type=bool, + default=lambda: get_env_or_default("TEST_ASYNC_MODE", False, bool), + help="Use asynchronous operations", +) +@click.option( + "--transaction-size", + type=int, + default=lambda: get_env_or_default("TEST_TRANSACTION_SIZE", 5, int), + help="Number of operations per transaction", +) +@click.option( + "--pubsub-channels", + default=lambda: get_env_or_default("TEST_PUBSUB_CHANNELS", None), + help="Comma-separated list of pub/sub channels", +) # ============================================================================ # Logging & Output Parameters # ============================================================================ -@click.option('--log-level', default=lambda: get_env_or_default('LOG_LEVEL', 'INFO'), type=click.Choice(['DEBUG', 'INFO', 'WARNING', 'ERROR']), help='Logging level') -@click.option('--log-file', default=lambda: get_env_or_default('LOG_FILE', None), help='Log file path') -@click.option('--output-file', default=lambda: get_env_or_default('OUTPUT_FILE', None), help='Output file for final test summary (JSON). If not provided, prints to stdout.') -@click.option('--quiet', type=bool, default=False, help='Suppress periodic stats output') - +@click.option( + "--log-level", + default=lambda: get_env_or_default("LOG_LEVEL", "INFO"), + type=click.Choice(["DEBUG", "INFO", "WARNING", "ERROR"]), + help="Logging level", +) +@click.option( + "--log-file", + default=lambda: get_env_or_default("LOG_FILE", None), + help="Log file path", +) +@click.option( + "--output-file", + default=lambda: get_env_or_default("OUTPUT_FILE", None), + help="Output file for final test summary (JSON). If not provided, prints to stdout.", +) +@click.option( + "--quiet", type=bool, default=False, help="Suppress periodic stats output" +) # ============================================================================ # OpenTelemetry & Metrics Parameters # ============================================================================ -@click.option('--otel-endpoint', default=lambda: get_env_or_default('OTEL_EXPORTER_OTLP_ENDPOINT', None), help='OpenTelemetry OTLP endpoint') -@click.option('--otel-service-name', default=lambda: get_env_or_default('OTEL_SERVICE_NAME', 'redis-load-test'), help='OpenTelemetry service name') -@click.option('--otel-export-interval', type=int, default=lambda: get_env_or_default('OTEL_EXPORT_INTERVAL', 5000, int), help='OpenTelemetry export interval in milliseconds') -@click.option('--metrics-interval', type=int, default=lambda: get_env_or_default('METRICS_INTERVAL', 5, int), help='Metrics reporting interval in seconds') - +@click.option( + "--otel-endpoint", + default=lambda: get_env_or_default("OTEL_EXPORTER_OTLP_ENDPOINT", None), + help="OpenTelemetry OTLP endpoint", +) +@click.option( + "--otel-service-name", + default=lambda: get_env_or_default("OTEL_SERVICE_NAME", "redis-load-test"), + help="OpenTelemetry service name", +) +@click.option( + "--otel-export-interval", + type=int, + default=lambda: get_env_or_default("OTEL_EXPORT_INTERVAL", 5000, int), + help="OpenTelemetry export interval in milliseconds", +) +@click.option( + "--metrics-interval", + type=int, + default=lambda: get_env_or_default("METRICS_INTERVAL", 5, int), + help="Metrics reporting interval in seconds", +) # ============================================================================ # Application Identification Parameters # ============================================================================ -@click.option('--app-name', default=lambda: get_env_or_default('APP_NAME', 'python'), help='Application name for multi-app filtering (python, go, java, etc.)') -@click.option('--instance-id', default=lambda: get_env_or_default('INSTANCE_ID', None), help='Unique instance identifier (auto-generated if not provided)') -@click.option('--run-id', default=lambda: get_env_or_default('RUN_ID', None), help='Unique run identifier (auto-generated if not provided)') -@click.option('--version', default=lambda: get_env_or_default('VERSION', None), help='Version identifier (defaults to redis-py package version)') - +@click.option( + "--app-name", + default=lambda: get_env_or_default("APP_NAME", "python"), + help="Application name for multi-app filtering (python, go, java, etc.)", +) +@click.option( + "--instance-id", + default=lambda: get_env_or_default("INSTANCE_ID", None), + help="Unique instance identifier (auto-generated if not provided)", +) +@click.option( + "--run-id", + default=lambda: get_env_or_default("RUN_ID", None), + help="Unique run identifier (auto-generated if not provided)", +) +@click.option( + "--version", + default=lambda: get_env_or_default("VERSION", None), + help="Version identifier (defaults to redis-py package version)", +) # ============================================================================ # Configuration File Parameters # ============================================================================ -@click.option('--config-file', default=lambda: get_env_or_default('CONFIG_FILE', None), help='Load configuration from YAML/JSON file') -@click.option('--save-config', help='Save current configuration to file') +@click.option( + "--config-file", + default=lambda: get_env_or_default("CONFIG_FILE", None), + help="Load configuration from YAML/JSON file", +) +@click.option("--save-config", help="Save current configuration to file") def run(**kwargs): """Run Redis load test with specified configuration.""" try: # Load configuration from file if specified - if kwargs['config_file']: + if kwargs["config_file"]: from config import load_config_from_file - config = load_config_from_file(kwargs['config_file']) + + config = load_config_from_file(kwargs["config_file"]) click.echo(f"Loaded configuration from {kwargs['config_file']}") else: # Build configuration from command line arguments config = _build_config_from_args(kwargs) # Save configuration if requested - if kwargs['save_config']: + if kwargs["save_config"]: from config import save_config_to_file - save_config_to_file(config, kwargs['save_config']) + + save_config_to_file(config, kwargs["save_config"]) click.echo(f"Configuration saved to {kwargs['save_config']}") return @@ -230,21 +503,23 @@ def run(**kwargs): click.echo(f"Error: {e}", err=True) sys.exit(1) + @cli.command() def test_connection(): """Test Redis connection with current configuration.""" try: # Build minimal config for connection test redis_config = RedisConnectionConfig( - host=get_env_or_default('REDIS_HOST', 'localhost'), - port=get_env_or_default('REDIS_PORT', 6379, int), - password=get_env_or_default('REDIS_PASSWORD', None), - database=get_env_or_default('REDIS_DB', 0, "int"), - cluster_mode=get_env_or_default('REDIS_CLUSTER', False, bool), - ssl=get_env_or_default('REDIS_SSL', False, bool) + host=get_env_or_default("REDIS_HOST", "localhost"), + port=get_env_or_default("REDIS_PORT", 6379, int), + password=get_env_or_default("REDIS_PASSWORD", None), + database=get_env_or_default("REDIS_DB", 0, "int"), + cluster_mode=get_env_or_default("REDIS_CLUSTER", False, bool), + ssl=get_env_or_default("REDIS_SSL", False, bool), ) from redis_client import RedisClient + client = RedisClient(redis_config) # Test basic operations @@ -253,7 +528,9 @@ def test_connection(): click.echo("✓ Redis connection successful!") click.echo(f"Redis version: {info.get('redis_version', 'unknown')}") - click.echo(f"Redis mode: {'cluster' if redis_config.cluster_mode else 'standalone'}") + click.echo( + f"Redis mode: {'cluster' if redis_config.cluster_mode else 'standalone'}" + ) click.echo(f"Connected clients: {info.get('connected_clients', 'unknown')}") client.close() @@ -268,94 +545,94 @@ def _build_config_from_args(kwargs) -> RunnerConfig: # Parse cluster nodes cluster_nodes = [] - if kwargs['cluster_nodes']: - for node in kwargs['cluster_nodes'].split(','): - host, port = node.strip().split(':') - cluster_nodes.append({'host': host, 'port': int(port)}) + if kwargs["cluster_nodes"]: + for node in kwargs["cluster_nodes"].split(","): + host, port = node.strip().split(":") + cluster_nodes.append({"host": host, "port": int(port)}) # Parse operation weights operation_weights = {} - if kwargs['operation_weights']: - operation_weights = json.loads(kwargs['operation_weights']) + if kwargs["operation_weights"]: + operation_weights = json.loads(kwargs["operation_weights"]) # Parse pub/sub channels pubsub_channels = [] - if kwargs['pubsub_channels']: - pubsub_channels = [ch.strip() for ch in kwargs['pubsub_channels'].split(',')] + if kwargs["pubsub_channels"]: + pubsub_channels = [ch.strip() for ch in kwargs["pubsub_channels"].split(",")] # Build Redis connection config redis_config = RedisConnectionConfig( - host=kwargs['host'], - port=kwargs['port'], - password=kwargs['password'], - database=kwargs['db'], - cluster_mode=kwargs['cluster_enabled'], + host=kwargs["host"], + port=kwargs["port"], + password=kwargs["password"], + database=kwargs["db"], + cluster_mode=kwargs["cluster_enabled"], cluster_nodes=cluster_nodes, - ssl=kwargs['ssl_enabled'], - ssl_keyfile=kwargs['ssl_keyfile'], - ssl_certfile=kwargs['ssl_certfile'], - ssl_cert_reqs=kwargs['ssl_cert_reqs'], - ssl_ca_certs=kwargs['ssl_ca_certs'], - ssl_ca_path=kwargs['ssl_ca_path'], - ssl_ca_data=kwargs['ssl_ca_data'], - ssl_check_hostname=kwargs['ssl_check_hostname'], - ssl_password=kwargs['ssl_password'], - ssl_min_version=kwargs['ssl_min_version'], - ssl_ciphers=kwargs['ssl_ciphers'], - socket_timeout=kwargs['socket_timeout'], - socket_connect_timeout=kwargs['socket_connect_timeout'], - max_connections=kwargs['max_connections'], - client_retry_attempts=kwargs['client_retry_attempts'], - maintenance_notifications_enabled=kwargs['maintenance_notifications_enabled'], - maintenance_relaxed_timeout=kwargs['maintenance_relaxed_timeout'] + ssl=kwargs["ssl_enabled"], + ssl_keyfile=kwargs["ssl_keyfile"], + ssl_certfile=kwargs["ssl_certfile"], + ssl_cert_reqs=kwargs["ssl_cert_reqs"], + ssl_ca_certs=kwargs["ssl_ca_certs"], + ssl_ca_path=kwargs["ssl_ca_path"], + ssl_ca_data=kwargs["ssl_ca_data"], + ssl_check_hostname=kwargs["ssl_check_hostname"], + ssl_password=kwargs["ssl_password"], + ssl_min_version=kwargs["ssl_min_version"], + ssl_ciphers=kwargs["ssl_ciphers"], + socket_timeout=kwargs["socket_timeout"], + socket_connect_timeout=kwargs["socket_connect_timeout"], + max_connections=kwargs["max_connections"], + client_retry_attempts=kwargs["client_retry_attempts"], + maintenance_notifications_enabled=kwargs["maintenance_notifications_enabled"], + maintenance_relaxed_timeout=kwargs["maintenance_relaxed_timeout"], ) - # Build workload config workload_config = WorkloadProfiles.get_profile("basic_rw") # If a profile is specified, use it. If any additional options have been specified, they will override the defaults - if kwargs['workload_profile']: - workload_config = WorkloadProfiles.get_profile(kwargs['workload_profile']) + if kwargs["workload_profile"]: + workload_config = WorkloadProfiles.get_profile(kwargs["workload_profile"]) - - if kwargs['operations']: - operations = [op.strip() for op in kwargs['operations'].split(',')] + if kwargs["operations"]: + operations = [op.strip() for op in kwargs["operations"].split(",")] workload_config.options["operations"] = operations else: workload_config.options["operations"] = workload_config.get_option("operations") # Build options dictionary - workload_config.options["keyPrefix"] = kwargs['key_prefix'] or workload_config.get_option("keyPrefix") + workload_config.options["keyPrefix"] = kwargs[ + "key_prefix" + ] or workload_config.get_option("keyPrefix") - if kwargs['key_range'] is not None: - workload_config.options["keyRange"] = kwargs['key_range'] + if kwargs["key_range"] is not None: + workload_config.options["keyRange"] = kwargs["key_range"] - if kwargs['read_write_ratio'] is not None: - workload_config.options["readWriteRatio"] = kwargs['read_write_ratio'] + if kwargs["read_write_ratio"] is not None: + workload_config.options["readWriteRatio"] = kwargs["read_write_ratio"] - if kwargs['use_pipeline'] is not None: - workload_config.options["usePipeline"] = kwargs['use_pipeline'] + if kwargs["use_pipeline"] is not None: + workload_config.options["usePipeline"] = kwargs["use_pipeline"] - if kwargs['async_mode'] is not None: - workload_config.options["asyncMode"] = kwargs['async_mode'] + if kwargs["async_mode"] is not None: + workload_config.options["asyncMode"] = kwargs["async_mode"] - if kwargs['pipeline_size'] is not None: - workload_config.options["pipelineSize"] = kwargs['pipeline_size'] + if kwargs["pipeline_size"] is not None: + workload_config.options["pipelineSize"] = kwargs["pipeline_size"] - if kwargs['transaction_size'] is not None: - workload_config.options["transactionSize"] = kwargs['transaction_size'] + if kwargs["transaction_size"] is not None: + workload_config.options["transactionSize"] = kwargs["transaction_size"] # Handle value size - if fixed size is provided, use it; otherwise use min/max - if kwargs['value_size'] is not None: - workload_config.options["valueSize"] = kwargs['value_size'] + if kwargs["value_size"] is not None: + workload_config.options["valueSize"] = kwargs["value_size"] else: # Set min/max values only if they are not None (allowing 0 as valid) - if kwargs['value_size_min'] is not None: - workload_config.options["valueSizeMin"] = kwargs['value_size_min'] - if kwargs['value_size_max'] is not None: - workload_config.options["valueSizeMax"] = kwargs['value_size_max'] + if kwargs["value_size_min"] is not None: + workload_config.options["valueSizeMin"] = kwargs["value_size_min"] + if kwargs["value_size_max"] is not None: + workload_config.options["valueSizeMax"] = kwargs["value_size_max"] # Add operation weights if provided if operation_weights: @@ -367,40 +644,41 @@ def _build_config_from_args(kwargs) -> RunnerConfig: # Build test config test_config = TestConfig( - mode="cluster" if kwargs.get('cluster', False) else "standalone", - clients=kwargs['clients'], - threads_per_client=kwargs['threads_per_client'], - duration=kwargs['duration'], - target_ops_per_second=kwargs['target_ops_per_second'], - workload=workload_config + mode="cluster" if kwargs.get("cluster", False) else "standalone", + clients=kwargs["clients"], + threads_per_client=kwargs["threads_per_client"], + duration=kwargs["duration"], + target_ops_per_second=kwargs["target_ops_per_second"], + workload=workload_config, ) # Build concatenated app name with workload profile - base_app_name = kwargs['app_name'] - workload_profile_name = kwargs.get('workload_profile', 'custom') + base_app_name = kwargs["app_name"] + workload_profile_name = kwargs.get("workload_profile", "custom") concatenated_app_name = f"{base_app_name}-{workload_profile_name}" # Auto-generate instance_id and run_id if not provided import uuid - instance_id = kwargs['instance_id'] or str(uuid.uuid4()) - run_id = kwargs['run_id'] or str(uuid.uuid4()) + + instance_id = kwargs["instance_id"] or str(uuid.uuid4()) + run_id = kwargs["run_id"] or str(uuid.uuid4()) # Build main runner config config = RunnerConfig( redis=redis_config, test=test_config, - log_level=kwargs['log_level'], - log_file=kwargs['log_file'], - metrics_interval=kwargs['metrics_interval'], - output_file=kwargs['output_file'], - quiet=kwargs['quiet'], - otel_endpoint=kwargs['otel_endpoint'], - otel_service_name=kwargs['otel_service_name'], - otel_export_interval_ms=kwargs['otel_export_interval'], + log_level=kwargs["log_level"], + log_file=kwargs["log_file"], + metrics_interval=kwargs["metrics_interval"], + output_file=kwargs["output_file"], + quiet=kwargs["quiet"], + otel_endpoint=kwargs["otel_endpoint"], + otel_service_name=kwargs["otel_service_name"], + otel_export_interval_ms=kwargs["otel_export_interval"], app_name=concatenated_app_name, instance_id=instance_id, run_id=run_id, - version=kwargs['version'] or get_redis_version() + version=kwargs["version"] or get_redis_version(), ) return config @@ -418,5 +696,5 @@ def _validate_config(config: RunnerConfig): raise ValueError("Workload type must be specified") -if __name__ == '__main__': +if __name__ == "__main__": cli() diff --git a/config.py b/config.py index e782e98..11c989b 100644 --- a/config.py +++ b/config.py @@ -1,6 +1,7 @@ """ Configuration management for Redis test application. """ + from dataclasses import dataclass, field from typing import Dict, List, Optional, Any, Union import yaml @@ -13,7 +14,7 @@ def get_redis_version() -> str: """Get the version of the redis-py package.""" try: - return importlib.metadata.version('redis') + return importlib.metadata.version("redis") except importlib.metadata.PackageNotFoundError: return "unknown" @@ -43,7 +44,7 @@ def parse_duration(duration_str: str) -> int: return int(duration_str) # Handle ISO 8601 format (PT1H30M45S) - if not duration_str.startswith('PT'): + if not duration_str.startswith("PT"): raise ValueError(f"Invalid duration format: {duration_str}") # Remove PT prefix @@ -55,17 +56,17 @@ def parse_duration(duration_str: str) -> int: seconds = 0 # Extract hours - hour_match = re.search(r'(\d+)H', duration_str) + hour_match = re.search(r"(\d+)H", duration_str) if hour_match: hours = int(hour_match.group(1)) # Extract minutes - minute_match = re.search(r'(\d+)M', duration_str) + minute_match = re.search(r"(\d+)M", duration_str) if minute_match: minutes = int(minute_match.group(1)) # Extract seconds - second_match = re.search(r'(\d+)S', duration_str) + second_match = re.search(r"(\d+)S", duration_str) if second_match: seconds = int(second_match.group(1)) @@ -75,6 +76,7 @@ def parse_duration(duration_str: str) -> int: @dataclass class RedisConnectionConfig: """Redis connection configuration matching lettuce-test-app structure.""" + client_name: str = "redis-py-test-app" host: str = "localhost" port: int = 6379 @@ -83,16 +85,18 @@ class RedisConnectionConfig: database: int = 0 # Changed from 'db' to match lettuce verify_peer: bool = False protocol: int = 3 # 2 for RESP2, 3 for RESP3 - + # Cluster configuration cluster_mode: bool = False cluster_nodes: List[Dict[str, Any]] = field(default_factory=list) - + # TLS configuration ssl: bool = False ssl_keyfile: Optional[str] = None ssl_certfile: Optional[str] = None - ssl_cert_reqs: Union[str, int] = "required" # Can be "required", "optional", "none" or ssl.VerifyMode + ssl_cert_reqs: Union[str, int] = ( + "required" # Can be "required", "optional", "none" or ssl.VerifyMode + ) ssl_ca_certs: Optional[str] = None ssl_ca_path: Optional[str] = None ssl_ca_data: Optional[str] = None @@ -100,13 +104,13 @@ class RedisConnectionConfig: ssl_password: Optional[str] = None ssl_min_version: Optional[ssl_module.TLSVersion] = None # ssl.TLSVersion ssl_ciphers: Optional[str] = None - + # Connection settings socket_timeout: Optional[float] = None socket_connect_timeout: Optional[float] = None socket_keepalive: bool = True socket_keepalive_options: Dict[str, int] = field(default_factory=dict) - + # Connection pool settings max_connections: int = 50 retry_on_timeout: bool = True @@ -114,14 +118,19 @@ class RedisConnectionConfig: # Redis client-level retry configuration (redis-py Retry object) # These retries happen at the Redis client level for network/connection issues client_retry_attempts: int = 3 - maintenance_notifications_enabled: bool = True + maintenance_notifications_enabled: Union[bool, str] = ( + True # Can be True, False, or 'auto' + ) maintenance_relaxed_timeout: Optional[float] = None @dataclass class WorkloadConfig: """Workload configuration matching lettuce-test-app structure.""" - type: str = "get_set" # Workload type (get_set, redis_commands, multi, pub_sub, etc.) + + type: str = ( + "get_set" # Workload type (get_set, redis_commands, multi, pub_sub, etc.) + ) max_duration: Optional[str] = "PT60S" # ISO 8601 duration format # Options dictionary for workload-specific configuration @@ -181,6 +190,7 @@ class TestConfig: @dataclass class RunnerConfig: """Main runner configuration""" + redis: RedisConnectionConfig = field(default_factory=RedisConnectionConfig) test: TestConfig = field(default_factory=TestConfig) @@ -223,10 +233,9 @@ def get_profile(profile_name: str) -> WorkloadConfig: "valueSize": 100, "iterationCount": 1000, "keyPrefix": "rw_test", - "keyRange": 10000 - } + "keyRange": 10000, + }, ), - "high_throughput": WorkloadConfig( type="high_throughput", max_duration="PT60S", @@ -238,10 +247,9 @@ def get_profile(profile_name: str) -> WorkloadConfig: "usePipeline": True, "pipelineSize": 10, "keyPrefix": "perf_test", - "keyRange": 50000 - } + "keyRange": 50000, + }, ), - "list_operations": WorkloadConfig( type="list_operations", max_duration="PT60S", @@ -251,10 +259,9 @@ def get_profile(profile_name: str) -> WorkloadConfig: "valueSize": 100, "iterationCount": 1000, "elementsCount": 10, - "keyPrefix": "list_test" - } + "keyPrefix": "list_test", + }, ), - "pubsub_heavy": WorkloadConfig( type="pubsub_heavy", max_duration="PT60S", @@ -263,10 +270,9 @@ def get_profile(profile_name: str) -> WorkloadConfig: "operation_weights": {"PUBLISH": 0.7, "SUBSCRIBE": 0.3}, "channels": ["channel1", "channel2", "channel3"], "messageSize": 200, - "messageCount": 1000 - } + "messageCount": 1000, + }, ), - "transaction_heavy": WorkloadConfig( type="transaction_heavy", max_duration="PT60S", @@ -275,24 +281,28 @@ def get_profile(profile_name: str) -> WorkloadConfig: "transactionSize": 5, "valueSize": 100, "iterationCount": 500, - "keyPrefix": "tx_test" - } + "keyPrefix": "tx_test", + }, ), - "async_mixed": WorkloadConfig( type="async_mixed", max_duration="PT60S", options={ "operations": ["SET", "GET", "LPUSH", "LRANGE"], - "operation_weights": {"SET": 0.3, "GET": 0.4, "LPUSH": 0.2, "LRANGE": 0.1}, + "operation_weights": { + "SET": 0.3, + "GET": 0.4, + "LPUSH": 0.2, + "LRANGE": 0.1, + }, "asyncMode": True, "usePipeline": True, "pipelineSize": 20, "valueSize": 150, "iterationCount": 1500, - "awaitAllResponses": True - } - ) + "awaitAllResponses": True, + }, + ), } return profiles.get(profile_name, WorkloadConfig()) @@ -300,26 +310,33 @@ def get_profile(profile_name: str) -> WorkloadConfig: @staticmethod def list_profiles() -> List[str]: """List available workload profiles.""" - return ["basic_rw", "high_throughput", "list_operations", "pubsub_heavy", "transaction_heavy", "async_mixed"] + return [ + "basic_rw", + "high_throughput", + "list_operations", + "pubsub_heavy", + "transaction_heavy", + "async_mixed", + ] def load_config_from_file(file_path: str) -> RunnerConfig: """Load configuration from YAML or JSON file.""" - with open(file_path, 'r') as f: - if file_path.endswith('.yaml') or file_path.endswith('.yml'): + with open(file_path, "r") as f: + if file_path.endswith(".yaml") or file_path.endswith(".yml"): data = yaml.safe_load(f) else: data = json.load(f) # Convert nested dictionaries to dataclass instances - if 'redis' in data: - data['redis'] = RedisConnectionConfig(**data['redis']) + if "redis" in data: + data["redis"] = RedisConnectionConfig(**data["redis"]) - if 'test' in data: - test_data = data['test'] - if 'workload' in test_data: - test_data['workload'] = WorkloadConfig(**test_data['workload']) - data['test'] = TestConfig(**test_data) + if "test" in data: + test_data = data["test"] + if "workload" in test_data: + test_data["workload"] = WorkloadConfig(**test_data["workload"]) + data["test"] = TestConfig(**test_data) return RunnerConfig(**data) @@ -328,25 +345,25 @@ def save_config_to_file(config: RunnerConfig, file_path: str): """Save configuration to YAML file.""" # Convert dataclasses to dictionaries config_dict = { - 'redis': config.redis.__dict__, - 'test': { - 'mode': config.test.mode, - 'clients': config.test.clients, - 'threads_per_client': config.test.threads_per_client, - 'duration': config.test.duration, - 'target_ops_per_second': config.test.target_ops_per_second, - 'workload': config.test.workload.__dict__ + "redis": config.redis.__dict__, + "test": { + "mode": config.test.mode, + "clients": config.test.clients, + "threads_per_client": config.test.threads_per_client, + "duration": config.test.duration, + "target_ops_per_second": config.test.target_ops_per_second, + "workload": config.test.workload.__dict__, }, - 'log_level': config.log_level, - 'log_file': config.log_file, - 'metrics_interval': config.metrics_interval, - 'output_file': config.output_file, - 'quiet': config.quiet, - 'otel_endpoint': config.otel_endpoint, - 'otel_service_name': config.otel_service_name, - 'otel_service_version': config.otel_service_version, - 'otel_resource_attributes': config.otel_resource_attributes + "log_level": config.log_level, + "log_file": config.log_file, + "metrics_interval": config.metrics_interval, + "output_file": config.output_file, + "quiet": config.quiet, + "otel_endpoint": config.otel_endpoint, + "otel_service_name": config.otel_service_name, + "otel_service_version": config.otel_service_version, + "otel_resource_attributes": config.otel_resource_attributes, } - with open(file_path, 'w') as f: + with open(file_path, "w") as f: yaml.dump(config_dict, f, default_flow_style=False, indent=2) diff --git a/redis_client.py b/redis_client.py index f9a7726..2995203 100644 --- a/redis_client.py +++ b/redis_client.py @@ -1,18 +1,17 @@ """ Redis client management with support for standalone, cluster, and TLS connections. """ + import time import ssl from typing import Optional, List, Dict, Any, Union import redis import redis.sentinel -from redis.cluster import RedisCluster +from redis.cluster import ClusterNode, RedisCluster from redis.maint_notifications import MaintNotificationsConfig from redis.retry import Retry from redis.backoff import ExponentialWithJitterBackoff -from redis.exceptions import ( - ConnectionError, TimeoutError, ClusterDownError -) +from redis.exceptions import ConnectionError, TimeoutError, ClusterDownError from config import RedisConnectionConfig @@ -27,27 +26,29 @@ def _convert_ssl_min_version(version_str: Optional[str]) -> Optional[ssl.TLSVers # Mapping of string values to ssl.TLSVersion enum values version_mapping = { - 'TLSv1': ssl.TLSVersion.TLSv1, - 'TLSv1_1': ssl.TLSVersion.TLSv1_1, - 'TLSv1_2': ssl.TLSVersion.TLSv1_2, - 'TLSv1_3': ssl.TLSVersion.TLSv1_3, + "TLSv1": ssl.TLSVersion.TLSv1, + "TLSv1_1": ssl.TLSVersion.TLSv1_1, + "TLSv1_2": ssl.TLSVersion.TLSv1_2, + "TLSv1_3": ssl.TLSVersion.TLSv1_3, # Also support lowercase variants - 'tlsv1': ssl.TLSVersion.TLSv1, - 'tlsv1_1': ssl.TLSVersion.TLSv1_1, - 'tlsv1_2': ssl.TLSVersion.TLSv1_2, - 'tlsv1_3': ssl.TLSVersion.TLSv1_3, + "tlsv1": ssl.TLSVersion.TLSv1, + "tlsv1_1": ssl.TLSVersion.TLSv1_1, + "tlsv1_2": ssl.TLSVersion.TLSv1_2, + "tlsv1_3": ssl.TLSVersion.TLSv1_3, # Support numeric versions - '1.0': ssl.TLSVersion.TLSv1, - '1.1': ssl.TLSVersion.TLSv1_1, - '1.2': ssl.TLSVersion.TLSv1_2, - '1.3': ssl.TLSVersion.TLSv1_3, + "1.0": ssl.TLSVersion.TLSv1, + "1.1": ssl.TLSVersion.TLSv1_1, + "1.2": ssl.TLSVersion.TLSv1_2, + "1.3": ssl.TLSVersion.TLSv1_3, } if version_str in version_mapping: return version_mapping[version_str] else: - raise ValueError(f"Unsupported SSL version: {version_str}. " - f"Supported versions: {list(version_mapping.keys())}") + raise ValueError( + f"Unsupported SSL version: {version_str}. " + f"Supported versions: {list(version_mapping.keys())}" + ) def _convert_ssl_cert_reqs(cert_reqs: Union[str, int]) -> Union[ssl.VerifyMode, int]: @@ -57,30 +58,32 @@ def _convert_ssl_cert_reqs(cert_reqs: Union[str, int]) -> Union[ssl.VerifyMode, # Mapping of string values to ssl.VerifyMode enum values cert_reqs_mapping = { - 'none': ssl.CERT_NONE, - 'optional': ssl.CERT_OPTIONAL, - 'required': ssl.CERT_REQUIRED, + "none": ssl.CERT_NONE, + "optional": ssl.CERT_OPTIONAL, + "required": ssl.CERT_REQUIRED, # Also support uppercase variants - 'NONE': ssl.CERT_NONE, - 'OPTIONAL': ssl.CERT_OPTIONAL, - 'REQUIRED': ssl.CERT_REQUIRED, + "NONE": ssl.CERT_NONE, + "OPTIONAL": ssl.CERT_OPTIONAL, + "REQUIRED": ssl.CERT_REQUIRED, } if cert_reqs in cert_reqs_mapping: return cert_reqs_mapping[cert_reqs] else: - raise ValueError(f"Unsupported SSL cert requirements: {cert_reqs}. " - f"Supported values: {list(cert_reqs_mapping.keys())}") + raise ValueError( + f"Unsupported SSL cert requirements: {cert_reqs}. " + f"Supported values: {list(cert_reqs_mapping.keys())}" + ) class RedisClient: """Manages Redis connections with automatic reconnection and error handling.""" - + def __init__(self, config: RedisConnectionConfig): self.config = config self.logger = get_logger() self.metrics = get_metrics_collector() - + self._client: Optional[Union[redis.Redis, RedisCluster]] = None # Connection pool configuration @@ -88,63 +91,69 @@ def __init__(self, config: RedisConnectionConfig): # Initialize connection self._connect() - + def _build_pool_kwargs(self) -> Dict[str, Any]: """Build connection pool keyword arguments.""" kwargs = { - 'socket_keepalive': self.config.socket_keepalive, - 'socket_keepalive_options': self.config.socket_keepalive_options, - 'max_connections': self.config.max_connections, + "socket_keepalive": self.config.socket_keepalive, + "socket_keepalive_options": self.config.socket_keepalive_options, + "max_connections": self.config.max_connections, } # Only add timeout parameters if they are not None (let redis-py use defaults) if self.config.socket_timeout is not None: - kwargs['socket_timeout'] = self.config.socket_timeout + kwargs["socket_timeout"] = self.config.socket_timeout if self.config.socket_connect_timeout is not None: - kwargs['socket_connect_timeout'] = self.config.socket_connect_timeout + kwargs["socket_connect_timeout"] = self.config.socket_connect_timeout # Create Retry object for client-level retries (network/connection issues) if self.config.client_retry_attempts >= 0: - kwargs['retry'] = Retry(ExponentialWithJitterBackoff(), self.config.client_retry_attempts) + kwargs["retry"] = Retry( + ExponentialWithJitterBackoff(), self.config.client_retry_attempts + ) # Add authentication if provided if self.config.password: - kwargs['password'] = self.config.password - + kwargs["password"] = self.config.password + # Add SSL configuration if enabled if self.config.ssl: - ssl_kwargs = {'ssl': True} + ssl_kwargs = {"ssl": True} # Add SSL parameters directly as redis-py expects them if self.config.ssl_keyfile is not None: - ssl_kwargs['ssl_keyfile'] = self.config.ssl_keyfile + ssl_kwargs["ssl_keyfile"] = self.config.ssl_keyfile if self.config.ssl_certfile is not None: - ssl_kwargs['ssl_certfile'] = self.config.ssl_certfile + ssl_kwargs["ssl_certfile"] = self.config.ssl_certfile if self.config.ssl_cert_reqs is not None: - ssl_kwargs['ssl_cert_reqs'] = _convert_ssl_cert_reqs(self.config.ssl_cert_reqs) + ssl_kwargs["ssl_cert_reqs"] = _convert_ssl_cert_reqs( + self.config.ssl_cert_reqs + ) if self.config.ssl_ca_certs is not None: - ssl_kwargs['ssl_ca_certs'] = self.config.ssl_ca_certs + ssl_kwargs["ssl_ca_certs"] = self.config.ssl_ca_certs if self.config.ssl_ca_path is not None: - ssl_kwargs['ssl_ca_path'] = self.config.ssl_ca_path + ssl_kwargs["ssl_ca_path"] = self.config.ssl_ca_path if self.config.ssl_ca_data is not None: - ssl_kwargs['ssl_ca_data'] = self.config.ssl_ca_data + ssl_kwargs["ssl_ca_data"] = self.config.ssl_ca_data if self.config.ssl_check_hostname is not None: - ssl_kwargs['ssl_check_hostname'] = self.config.ssl_check_hostname + ssl_kwargs["ssl_check_hostname"] = self.config.ssl_check_hostname if self.config.ssl_password is not None: - ssl_kwargs['ssl_password'] = self.config.ssl_password + ssl_kwargs["ssl_password"] = self.config.ssl_password if self.config.ssl_min_version is not None: # Handle both string and TLSVersion objects if isinstance(self.config.ssl_min_version, str): - ssl_kwargs['ssl_min_version'] = _convert_ssl_min_version(self.config.ssl_min_version) + ssl_kwargs["ssl_min_version"] = _convert_ssl_min_version( + self.config.ssl_min_version + ) else: - ssl_kwargs['ssl_min_version'] = self.config.ssl_min_version + ssl_kwargs["ssl_min_version"] = self.config.ssl_min_version if self.config.ssl_ciphers is not None: - ssl_kwargs['ssl_ciphers'] = self.config.ssl_ciphers + ssl_kwargs["ssl_ciphers"] = self.config.ssl_ciphers kwargs.update(ssl_kwargs) - + return kwargs - + def _connect(self) -> bool: """Establish connection to Redis.""" start_time = time.time() @@ -159,8 +168,10 @@ def _connect(self) -> bool: self._client.ping() connection_duration = time.time() - start_time - #TODO: @elena-kolevska record connection metrics - self.logger.info(f"Successfully connected to Redis in {connection_duration:.3f}s") + # TODO: @elena-kolevska record connection metrics + self.logger.info( + f"Successfully connected to Redis in {connection_duration:.3f}s" + ) return True @@ -169,25 +180,31 @@ def _connect(self) -> bool: self._client = None raise e - + def _connect_standalone(self): """Connect to standalone Redis instance.""" start_time = time.time() - if self.config.maintenance_notifications_enabled: + if self.config.maintenance_notifications_enabled is not False: # Build maintenance events config, only passing relaxed_timeouts if not None - maintenance_config_kwargs = {"enabled": self.config.maintenance_notifications_enabled} + # maintenance_notifications_enabled can be True, False, or 'auto' + maintenance_config_kwargs = { + "enabled": self.config.maintenance_notifications_enabled + } if self.config.maintenance_relaxed_timeout is not None: - maintenance_config_kwargs["relaxed_timeout"] = self.config.maintenance_relaxed_timeout - + maintenance_config_kwargs["relaxed_timeout"] = ( + self.config.maintenance_relaxed_timeout + ) self._client = redis.Redis( host=self.config.host, port=self.config.port, db=self.config.database, - maint_notifications_config=MaintNotificationsConfig(**maintenance_config_kwargs), + maint_notifications_config=MaintNotificationsConfig( + **maintenance_config_kwargs + ), protocol=self.config.protocol, - **self._pool_kwargs + **self._pool_kwargs, ) else: self._client = redis.Redis( @@ -195,40 +212,70 @@ def _connect_standalone(self): port=self.config.port, db=self.config.database, protocol=self.config.protocol, - **self._pool_kwargs + **self._pool_kwargs, ) - self.metrics.record_client_init_duration(time.time() - start_time, client="standalone-sync") - + self.metrics.record_client_init_duration( + time.time() - start_time, client="standalone-sync" + ) + def _connect_cluster(self): """Connect to Redis Cluster.""" + startup_nodes = [] if self.config.cluster_nodes: - startup_nodes = self.config.cluster_nodes + for node in self.config.cluster_nodes: + cluster_node = ClusterNode(node["host"], node["port"]) + startup_nodes.append(cluster_node) else: - startup_nodes = [{"host": self.config.host, "port": self.config.port}] + startup_nodes = [ClusterNode(self.config.host, self.config.port)] start_time = time.time() - self._client = RedisCluster( - startup_nodes=startup_nodes, - decode_responses=False, - skip_full_coverage_check=True, - **self._pool_kwargs + + if self.config.maintenance_notifications_enabled is not False: + # Build maintenance events config, only passing relaxed_timeouts if not None + # maintenance_notifications_enabled can be True, False, or 'auto' + maintenance_config_kwargs = { + "enabled": self.config.maintenance_notifications_enabled + } + if self.config.maintenance_relaxed_timeout is not None: + maintenance_config_kwargs["relaxed_timeout"] = ( + self.config.maintenance_relaxed_timeout + ) + self._client = RedisCluster( + startup_nodes=startup_nodes, + decode_responses=False, + skip_full_coverage_check=True, + protocol=self.config.protocol, + maint_notifications_config=MaintNotificationsConfig( + **maintenance_config_kwargs + ), + **self._pool_kwargs, + ) + else: + self._client = RedisCluster( + startup_nodes=startup_nodes, + decode_responses=False, + protocol=self.config.protocol, + skip_full_coverage_check=True, + **self._pool_kwargs, + ) + self.metrics.record_client_init_duration( + time.time() - start_time, client="cluster-sync" ) - self.metrics.record_client_init_duration(time.time() - start_time, client="cluster-sync") def pipeline(self, transaction: bool = True): """Create a pipeline for batch operations.""" return self._client.pipeline(transaction=transaction) - + def pubsub(self, **kwargs): return self._client.pubsub(**kwargs) - + def close(self): """Close the Redis connection.""" if self._client: try: - if hasattr(self._client, 'close'): + if hasattr(self._client, "close"): self._client.close() - elif hasattr(self._client, 'connection_pool'): + elif hasattr(self._client, "connection_pool"): self._client.connection_pool.disconnect() except Exception as e: self.logger.warning(f"Error closing Redis connection: {e}") @@ -239,7 +286,9 @@ def get_info(self) -> Dict[str, Any]: """Get Redis server information.""" return self._client.info() - def _execute_with_metrics(self, operation_name: str, client_method, *args, **kwargs): + def _execute_with_metrics( + self, operation_name: str, client_method, *args, **kwargs + ): """ Helper method to execute Redis operations with metrics tracking. @@ -262,7 +311,9 @@ def _execute_with_metrics(self, operation_name: str, client_method, *args, **kwa self.metrics.record_operation(operation_name, duration, False, error_type) # TODO @elena-kolevska add a separate counter for network errors - self.logger.warning(f"Redis client error for {operation_name}: {error_type} - {e}") + self.logger.warning( + f"Redis client error for {operation_name}: {error_type} - {e}" + ) raise except Exception as e: @@ -271,163 +322,169 @@ def _execute_with_metrics(self, operation_name: str, client_method, *args, **kwa error_type = type(e).__name__ self.metrics.record_operation(operation_name, duration, False, error_type) - self.logger.error(f"Redis operation error for {operation_name}: {error_type} - {e}") + self.logger.error( + f"Redis operation error for {operation_name}: {error_type} - {e}" + ) raise # Basic Redis operations with direct client method calls def set(self, key: str, value: str, **kwargs) -> bool: """Set a key-value pair.""" - return self._execute_with_metrics('SET', self._client.set, key, value, **kwargs) + return self._execute_with_metrics("SET", self._client.set, key, value, **kwargs) def get(self, key: str) -> Optional[str]: """Get value by key.""" - return self._execute_with_metrics('GET', self._client.get, key) + return self._execute_with_metrics("GET", self._client.get, key) def delete(self, *keys: str) -> int: """Delete one or more keys.""" - return self._execute_with_metrics('DELETE', self._client.delete, *keys) + return self._execute_with_metrics("DELETE", self._client.delete, *keys) def incr(self, key: str, amount: int = 1) -> int: """Increment a key by amount.""" if amount == 1: - return self._execute_with_metrics('INCR', self._client.incr, key) + return self._execute_with_metrics("INCR", self._client.incr, key) else: - return self._execute_with_metrics('INCR', self._client.incrby, key, amount) + return self._execute_with_metrics("INCR", self._client.incrby, key, amount) def decr(self, key: str, amount: int = 1) -> int: """Decrement a key by amount.""" if amount == 1: - return self._execute_with_metrics('DECR', self._client.decr, key) + return self._execute_with_metrics("DECR", self._client.decr, key) else: - return self._execute_with_metrics('DECR', self._client.decrby, key, amount) + return self._execute_with_metrics("DECR", self._client.decrby, key, amount) # List operations def lpush(self, key: str, *values: str) -> int: """Push values to the left of a list.""" - return self._execute_with_metrics('LPUSH', self._client.lpush, key, *values) + return self._execute_with_metrics("LPUSH", self._client.lpush, key, *values) def rpush(self, key: str, *values: str) -> int: """Push values to the right of a list.""" - return self._execute_with_metrics('RPUSH', self._client.rpush, key, *values) + return self._execute_with_metrics("RPUSH", self._client.rpush, key, *values) def lpop(self, key: str, count: Optional[int] = None) -> Optional[str]: """Pop value from the left of a list.""" if count is not None: - return self._execute_with_metrics('LPOP', self._client.lpop, key, count) + return self._execute_with_metrics("LPOP", self._client.lpop, key, count) else: - return self._execute_with_metrics('LPOP', self._client.lpop, key) + return self._execute_with_metrics("LPOP", self._client.lpop, key) def rpop(self, key: str, count: Optional[int] = None) -> Optional[str]: """Pop value from the right of a list.""" if count is not None: - return self._execute_with_metrics('RPOP', self._client.rpop, key, count) + return self._execute_with_metrics("RPOP", self._client.rpop, key, count) else: - return self._execute_with_metrics('RPOP', self._client.rpop, key) + return self._execute_with_metrics("RPOP", self._client.rpop, key) def lrange(self, key: str, start: int, end: int) -> List[str]: """Get a range of elements from a list.""" - return self._execute_with_metrics('LRANGE', self._client.lrange, key, start, end) + return self._execute_with_metrics( + "LRANGE", self._client.lrange, key, start, end + ) def llen(self, key: str) -> int: """Get the length of a list.""" - return self._execute_with_metrics('LLEN', self._client.llen, key) + return self._execute_with_metrics("LLEN", self._client.llen, key) def ltrim(self, key: str, start: int, end: int) -> bool: """Trim a list to the specified range.""" - return self._execute_with_metrics('LTRIM', self._client.ltrim, key, start, end) + return self._execute_with_metrics("LTRIM", self._client.ltrim, key, start, end) # String operations def append(self, key: str, value: str) -> int: """Append a value to a string.""" - return self._execute_with_metrics('APPEND', self._client.append, key, value) + return self._execute_with_metrics("APPEND", self._client.append, key, value) def strlen(self, key: str) -> int: """Get the length of a string.""" - return self._execute_with_metrics('STRLEN', self._client.strlen, key) + return self._execute_with_metrics("STRLEN", self._client.strlen, key) # Key operations def exists(self, *keys: str) -> int: """Check if keys exist.""" - return self._execute_with_metrics('EXISTS', self._client.exists, *keys) + return self._execute_with_metrics("EXISTS", self._client.exists, *keys) def expire(self, key: str, time: int) -> bool: """Set a key's time to live in seconds.""" - return self._execute_with_metrics('EXPIRE', self._client.expire, key, time) + return self._execute_with_metrics("EXPIRE", self._client.expire, key, time) def ttl(self, key: str) -> int: """Get the time to live for a key.""" - return self._execute_with_metrics('TTL', self._client.ttl, key) + return self._execute_with_metrics("TTL", self._client.ttl, key) def type(self, key: str) -> str: """Get the type of a key.""" - return self._execute_with_metrics('TYPE', self._client.type, key) + return self._execute_with_metrics("TYPE", self._client.type, key) def incrby(self, key: str, amount: int) -> int: """Increment a key by a specific amount.""" - return self._execute_with_metrics('INCRBY', self._client.incrby, key, amount) + return self._execute_with_metrics("INCRBY", self._client.incrby, key, amount) def decrby(self, key: str, amount: int) -> int: """Decrement a key by a specific amount.""" - return self._execute_with_metrics('DECRBY', self._client.decrby, key, amount) + return self._execute_with_metrics("DECRBY", self._client.decrby, key, amount) # Set operations def sadd(self, key: str, *values: str) -> int: """Add members to a set.""" - return self._execute_with_metrics('SADD', self._client.sadd, key, *values) + return self._execute_with_metrics("SADD", self._client.sadd, key, *values) def srem(self, key: str, *values: str) -> int: """Remove members from a set.""" - return self._execute_with_metrics('SREM', self._client.srem, key, *values) + return self._execute_with_metrics("SREM", self._client.srem, key, *values) def smembers(self, key: str) -> set: """Get all members of a set.""" - return self._execute_with_metrics('SMEMBERS', self._client.smembers, key) + return self._execute_with_metrics("SMEMBERS", self._client.smembers, key) def scard(self, key: str) -> int: """Get the number of members in a set.""" - return self._execute_with_metrics('SCARD', self._client.scard, key) + return self._execute_with_metrics("SCARD", self._client.scard, key) # Hash operations def hset(self, key: str, field: str, value: str) -> int: """Set a field in a hash.""" - return self._execute_with_metrics('HSET', self._client.hset, key, field, value) + return self._execute_with_metrics("HSET", self._client.hset, key, field, value) def hget(self, key: str, field: str) -> Optional[str]: """Get a field from a hash.""" - return self._execute_with_metrics('HGET', self._client.hget, key, field) + return self._execute_with_metrics("HGET", self._client.hget, key, field) def hdel(self, key: str, *fields: str) -> int: """Delete fields from a hash.""" - return self._execute_with_metrics('HDEL', self._client.hdel, key, *fields) + return self._execute_with_metrics("HDEL", self._client.hdel, key, *fields) def hgetall(self, key: str) -> Dict[str, str]: """Get all fields and values from a hash.""" - return self._execute_with_metrics('HGETALL', self._client.hgetall, key) + return self._execute_with_metrics("HGETALL", self._client.hgetall, key) def hlen(self, key: str) -> int: """Get the number of fields in a hash.""" - return self._execute_with_metrics('HLEN', self._client.hlen, key) + return self._execute_with_metrics("HLEN", self._client.hlen, key) # Sorted Set operations def zadd(self, key: str, mapping: Dict[str, float]) -> int: """Add members to a sorted set.""" - return self._execute_with_metrics('ZADD', self._client.zadd, key, mapping) + return self._execute_with_metrics("ZADD", self._client.zadd, key, mapping) def zrem(self, key: str, *members: str) -> int: """Remove members from a sorted set.""" - return self._execute_with_metrics('ZREM', self._client.zrem, key, *members) + return self._execute_with_metrics("ZREM", self._client.zrem, key, *members) def zrange(self, key: str, start: int, end: int, withscores: bool = False) -> List: """Get a range of members from a sorted set.""" - return self._execute_with_metrics('ZRANGE', self._client.zrange, key, start, end, withscores=withscores) + return self._execute_with_metrics( + "ZRANGE", self._client.zrange, key, start, end, withscores=withscores + ) def zcard(self, key: str) -> int: """Get the number of members in a sorted set.""" - return self._execute_with_metrics('ZCARD', self._client.zcard, key) + return self._execute_with_metrics("ZCARD", self._client.zcard, key) def zscore(self, key: str, member: str) -> Optional[float]: """Get the score of a member in a sorted set.""" - return self._execute_with_metrics('ZSCORE', self._client.zscore, key, member) + return self._execute_with_metrics("ZSCORE", self._client.zscore, key, member) # Pub/Sub operations def publish(self, channel: str, message: str) -> int: @@ -437,13 +494,15 @@ def publish(self, channel: str, message: str) -> int: result = self._client.publish(channel, message) duration = max(0.0, time.time() - start_time) # Record both general operation metrics and pub/sub specific metrics - self.metrics.record_operation('PUBLISH', duration, True) - self.metrics.record_pubsub_operation(channel, 'PUBLISH', success=True) + self.metrics.record_operation("PUBLISH", duration, True) + self.metrics.record_pubsub_operation(channel, "PUBLISH", success=True) return result except Exception as e: duration = max(0.0, time.time() - start_time) - self.metrics.record_operation('PUBLISH', duration, False, type(e).__name__) - self.metrics.record_pubsub_operation(channel, 'PUBLISH', success=False, error_type=type(e).__name__) + self.metrics.record_operation("PUBLISH", duration, False, type(e).__name__) + self.metrics.record_pubsub_operation( + channel, "PUBLISH", success=False, error_type=type(e).__name__ + ) raise def pubsub(self): @@ -454,6 +513,3 @@ def pubsub(self): def pipeline(self, transaction: bool = True): """Get a pipeline instance.""" return self._client.pipeline(transaction=transaction) - - - diff --git a/requirements.txt b/requirements.txt index ae2a7cc..4ebd5da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,6 @@ -redis==7.0.0b2 +# install redis-py from branch ps_oss_hitless_add_pubsub_tests +git+https://github.com/redis/redis-py.git@feat/hitless_upgrade_sync_cluster_client#egg=redis +#redis==7.1.0 #redis==6.4.0 click>=8.0.0 prometheus-client>=0.16.0