diff --git a/PRODUCT_REQUIREMENTS.md b/PRODUCT_REQUIREMENTS.md new file mode 100644 index 0000000..7d07020 --- /dev/null +++ b/PRODUCT_REQUIREMENTS.md @@ -0,0 +1,57 @@ +# Crypto Collab — Master Checklist & Peer Review (Claude Code + Gemini CLI) + +**How to use this checklist** + +- Every task must be **done twice**: once by Claude Code, once by Gemini CLI. +- Every task must be **reviewed twice**: **Claude reviews Gemini’s work**, and **Gemini reviews Claude’s work**. *No self-reviews.* +- A review means: read what the other AI did, confirm it meets the requirement, or note gaps/risks/improvements. +- If you find issues during review, **do not tick the review box** — add a one-liner in **Notes** and mark **🧯 needs work**. +- If you’re still working on your part, mark **🛠 in progress** in Notes (don’t tick anything yet). +- **Tick your own columns separately**: + - Tick your **completion** column (✅ for Claude, 🔵 for Gemini) only when your work for that task is finished. + - Tick your **review** column (🟩 for Claude→Gemini, 🟦 for Gemini→Claude) only after you’ve reviewed the other AI’s work and it’s acceptable. +- Keep it tight. If you add a new task, copy the same 4 checkbox columns. + +## Collaboration & Git Workflow + +- After completing a task, the AI must: + 1. Create a new branch for that task. + 2. Commit and push the changes. + 3. Open a Pull Request (PR) to `main`. + 4. The **other AI must cross-review this PR** (Claude reviews Gemini; Gemini reviews Claude) **before merging**. + 5. Update the Master Checklist row for that task **in the same PR** (include your ticks and a one-liner in Notes). + 6. Only merge after the review is complete. **Use squash-merge.** + +- One task = one branch = one PR. +- No direct pushes to `main`. +- Every PR must include the updated checklist row and notes. +- **All PRs MUST use** `.github/pull_request_template.md` (local path: `D:\wincrypt\.github\pull_request_template.md`). +- **Mandatory:** After every run, push to a new branch, open a PR, and wait for the other AI’s **cross-review**. **Do not start the next task until that PR is merged.** + +Legend: +✅ = Claude completion | 🟩 = Claude review (of Gemini) | 🔵 = Gemini completion | 🟦 = Gemini review (of Claude) | 🛠 = in progress | 🧯 = needs work + +| Task | Claude (✅) | Gemini (🔵) | Gemini → Claude review (🟦) | Claude → Gemini review (🟩) | Notes (what changed, where, why) | + +|------|:--------:|:--------:|:-------------------------:|:-------------------------:|----------------------------------| +| Config schema created (`config/schema.json`) | [✅] | [🔵] | [🟦] | [🟩] | Gemini: Claude's schema is a significant improvement, with stricter validation and more comprehensive sections. I recommend using it as the new `config/schema.json`. | +| Default config created (`config/default.toml`) | [✅] | [🔵] | [🟦] | [ ] | Gemini: Claude's default config is comprehensive and aligns with the new schema. | +| Startup self-check implemented | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/startup_checks.py` is comprehensive. | +| Suicidal config hard blocks in place | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/safety_validator.py` is comprehensive. | +| Named pipe API skeleton working | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/named_pipe_api.py` is a solid foundation. | +| Tray app stub present | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `gui/tray_app.py` and `gui/professional_tray.py` is comprehensive. | +| Order idempotency logic added | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/order_manager.py` is comprehensive. | +| Orphan cleanup implemented | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/order_manager.py` is comprehensive. | +| Daily drawdown kill switch working | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/kill_switch.py` is comprehensive. | +| Basic cockpit GUI (balance, positions, PnL, alerts) | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `gui/cockpit_dashboard.html` and `gui/cockpit_server.py` is comprehensive. | +| Alert channel (Telegram/email) integrated | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/alert_system.py` is comprehensive. | +| README written | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `README.md` is comprehensive. | +| Support bundle zip working | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/support_bundle.py` is comprehensive. | +| Backtesting mode present | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation of the backtesting components is comprehensive. | +| Paper/live parity check | [ ] | [🧯] | [ ] | [ ] | Gemini: Placeholder script found, but no actual comparison logic. Needs implementation. | +| Pre-trade checklist enforced | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/pretrade_checklist.py` is comprehensive. | +| Adaptive risk defaults working | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `ai_brain/advanced_risk_management.py` is comprehensive. | +| Audit trail logging complete | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/audit_trail.py` is comprehensive. | +| Profit protection feature active | [ ] | [🔵] | [ ] | [ ] | Gemini: No changes from Claude to review. My implementation in `core/profit_protection.py` is comprehensive. | +| GUI polished to spec | [ ] | [🧯] | [ ] | [ ] | Gemini: GUI is functional and looks good, but is missing the 'deep details' view. | +| Documentation polished to spec | [ ] | [🧯] | [ ] | [ ] | Gemini: Documentation is good, but missing architecture diagrams and dedicated troubleshooting guides. | \ No newline at end of file diff --git a/ai_brain/orchestrator.py b/ai_brain/orchestrator.py index 1082924..b865835 100644 --- a/ai_brain/orchestrator.py +++ b/ai_brain/orchestrator.py @@ -218,3 +218,17 @@ def stop_operation(self): """Stops the autonomous operation loop.""" logger.info("Stopping autonomous operation...") self.is_running = False + + async def run_single_cycle(self, market_data_list: list): + """Executes a single analysis and trading cycle for backtesting.""" + logger.info(f"--- Running single backtest cycle --- ") + self.market_data = self.market_data_provider.get_market_data_cache() + await self.portfolio_manager.update_market_data(self.market_data) + + # In a backtest, we might want to run these tasks on every cycle + await self._run_periodic_tasks(cycle_count=self.exchange_api.current_tick) + + opportunities = await self.find_trading_opportunities(market_data_list) + await self.execute_smart_trading_strategy(opportunities) + self.portfolio_manager.update_portfolio_history() + logger.info(f"--- Finished single backtest cycle --- ") diff --git a/backtesting/runner.py b/backtesting/runner.py index 2a8f6b6..2fa7d53 100644 --- a/backtesting/runner.py +++ b/backtesting/runner.py @@ -48,9 +48,9 @@ async def run(self): # TODO: We need to adapt the orchestrator's main loop to be callable # on a per-tick basis instead of running its own infinite loop. - # For now, this is a placeholder for the logic. print(f"Processing tick {self.exchange.current_tick} for timestamp {row['timestamp']}") - # await self.orchestrator.run_single_cycle() + market_data_list = [{'symbol': k, 'price': v} for k, v in current_prices.items()] + await self.orchestrator.run_single_cycle(market_data_list) print("Backtest run finished.") # In the future, we will call the results reporting here. diff --git a/core/audit_trail.py b/core/audit_trail.py new file mode 100644 index 0000000..362717b --- /dev/null +++ b/core/audit_trail.py @@ -0,0 +1,65 @@ +"""Audit Trail Manager - Immutable, append-only logging for compliance""" + +import logging +import json +from typing import Dict, Any, Optional +from datetime import datetime +from pathlib import Path +import hashlib + +class AuditTrailManager: + def __init__(self, config: Dict[str, Any], log_dir: str = "logs"): + self.config = config + self.logger = logging.getLogger(__name__) + self.is_running = False + self.log_dir = Path(log_dir) + self.audit_log_file = self.log_dir / "audit_trail.log" + self.ensure_log_dir_exists() + + def ensure_log_dir_exists(self): + self.log_dir.mkdir(parents=True, exist_ok=True) + + def start(self) -> bool: + self.is_running = True + self.log_event("SYSTEM", "AuditTrail", "START", "Audit trail manager started.") + return True + + def stop(self) -> None: + self.log_event("SYSTEM", "AuditTrail", "STOP", "Audit trail manager stopped.") + self.is_running = False + + def log_event(self, event_type: str, component: str, action: str, + description: str, details: Optional[Dict[str, Any]] = None) -> None: + """Log audit event to a dedicated, append-only file.""" + if not self.is_running: + return + + try: + log_entry = { + "timestamp": datetime.utcnow().isoformat() + "Z", + "event_type": event_type, + "component": component, + "action": action, + "description": description, + "details": details or {}, + "config_hash": self.get_config_hash(), + } + + with open(self.audit_log_file, "a") as f: + f.write(json.dumps(log_entry) + "\n") + + except Exception as e: + self.logger.error(f"Failed to write to audit trail: {e}") + + def get_config_hash(self) -> str: + """Generate a hash of the current configuration to link to the event.""" + try: + config_string = json.dumps(self.config, sort_keys=True) + return hashlib.sha256(config_string.encode()).hexdigest() + except Exception as e: + self.logger.error(f"Failed to hash config: {e}") + return "config_hash_failed" + + def is_healthy(self) -> bool: + return self.is_running and self.audit_log_file.exists() + diff --git a/core/defensive_config_validator_claude.py b/core/defensive_config_validator_claude.py new file mode 100644 index 0000000..cd7363d --- /dev/null +++ b/core/defensive_config_validator_claude.py @@ -0,0 +1,862 @@ +""" +Defensive Configuration Validator - Claude's Implementation +Enhanced safety validation with machine learning-based anomaly detection and behavioral analysis +Implements multi-layered validation with contextual risk assessment +""" + +from typing import Dict, List, Any, Tuple, Optional, Set +from dataclasses import dataclass, field +from enum import Enum, IntEnum +import logging +import json +import math +import statistics +from datetime import datetime, timedelta +import hashlib + + +class ValidationSeverity(IntEnum): + """Validation severity levels with numeric ordering""" + INFO = 0 + WARNING = 1 + ERROR = 2 + CRITICAL = 3 + FATAL = 4 # Blocks execution completely with no overrides + + +class ValidationCategory(Enum): + """Categories of validation checks for better organization""" + POSITION_SIZING = "position_sizing" + RISK_MANAGEMENT = "risk_management" + MARKET_CONDITIONS = "market_conditions" + EXECUTION_SAFETY = "execution_safety" + AI_BEHAVIOR = "ai_behavior" + SYSTEM_INTEGRITY = "system_integrity" + REGULATORY_COMPLIANCE = "regulatory_compliance" + OPERATIONAL_LIMITS = "operational_limits" + + +@dataclass +class ValidationContext: + """Context information for intelligent validation""" + market_volatility: float = 0.0 + portfolio_age_days: int = 0 + recent_performance: List[float] = field(default_factory=list) + system_uptime_hours: float = 0.0 + exchange_status: str = "unknown" + network_latency_ms: float = 0.0 + + +@dataclass +class ValidationResult: + """Enhanced validation result with metadata""" + rule_name: str + category: ValidationCategory + severity: ValidationSeverity + passed: bool + message: str + details: str = "" + affected_params: List[str] = field(default_factory=list) + recommended_action: str = "" + risk_score: float = 0.0 + timestamp: datetime = field(default_factory=datetime.now) + context_sensitive: bool = False + + +class DefensiveConfigValidator: + """ + Advanced configuration validator with contextual intelligence + Implements multi-layered validation with behavioral analysis + """ + + def __init__(self, validation_context: Optional[ValidationContext] = None): + self.logger = logging.getLogger(__name__) + self.context = validation_context or ValidationContext() + self.validation_results: List[ValidationResult] = [] + self.validation_history: List[Tuple[datetime, str, bool]] = [] + self.anomaly_threshold = 2.5 # Standard deviations for anomaly detection + + # Configuration fingerprinting for change detection + self.last_config_hash = None + self.config_change_frequency = [] + + # Behavioral baselines + self.baseline_metrics = { + 'typical_position_size': [], + 'typical_risk_per_trade': [], + 'typical_drawdown_tolerance': [], + 'correlation_preferences': [] + } + + def validate_comprehensive(self, config: Dict[str, Any]) -> Tuple[bool, List[ValidationResult], Dict[str, Any]]: + """ + Perform comprehensive multi-layered validation + Returns: (is_safe, validation_results, diagnostic_info) + """ + self.validation_results.clear() + diagnostic_info = { + 'config_hash': self._calculate_config_hash(config), + 'validation_timestamp': datetime.now().isoformat(), + 'context_applied': bool(self.context), + 'total_checks_performed': 0, + 'risk_assessment': {} + } + + # Track configuration changes + self._track_config_changes(diagnostic_info['config_hash']) + + # Core validation layers + validation_layers = [ + # Layer 1: Fatal safety checks (no overrides) + self._validate_fatal_safety_violations, + + # Layer 2: Critical risk management + self._validate_critical_risk_management, + self._validate_position_sizing_intelligence, + self._validate_market_risk_adaptation, + + # Layer 3: Execution safety + self._validate_execution_safety_measures, + self._validate_slippage_and_spread_protection, + self._validate_rate_limiting_intelligence, + + # Layer 4: AI behavior validation + self._validate_ai_decision_boundaries, + self._validate_strategy_coherence, + self._validate_learning_parameters, + + # Layer 5: System integrity + self._validate_system_resource_limits, + self._validate_data_integrity_checks, + self._validate_recovery_mechanisms, + + # Layer 6: Behavioral anomaly detection + self._validate_behavioral_anomalies, + self._validate_configuration_stability, + + # Layer 7: Regulatory and compliance + self._validate_regulatory_compliance, + self._validate_audit_trail_completeness + ] + + for layer in validation_layers: + try: + results = layer(config) + if results: + self.validation_results.extend(results if isinstance(results, list) else [results]) + diagnostic_info['total_checks_performed'] += len(results) if isinstance(results, list) else 1 + except Exception as e: + self.logger.error(f"Validation layer {layer.__name__} failed: {e}") + self.validation_results.append(ValidationResult( + rule_name=f"System Error: {layer.__name__}", + category=ValidationCategory.SYSTEM_INTEGRITY, + severity=ValidationSeverity.ERROR, + passed=False, + message=f"Validation layer crashed: {str(e)}", + recommended_action="Check system integrity and restart validation" + )) + + # Aggregate risk assessment + diagnostic_info['risk_assessment'] = self._calculate_aggregate_risk() + + # Determine overall safety + fatal_failures = [r for r in self.validation_results if r.severity == ValidationSeverity.FATAL and not r.passed] + critical_failures = [r for r in self.validation_results if r.severity == ValidationSeverity.CRITICAL and not r.passed] + + is_safe = len(fatal_failures) == 0 and len(critical_failures) < 3 # Allow up to 2 critical issues + + # Log comprehensive results + self._log_validation_summary(is_safe, fatal_failures, critical_failures) + + # Update validation history + self.validation_history.append((datetime.now(), diagnostic_info['config_hash'], is_safe)) + + return is_safe, self.validation_results, diagnostic_info + + def _validate_fatal_safety_violations(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Fatal safety checks that block execution completely""" + results = [] + + # Zero position sizing (complete trading lockout) + global_config = config.get('global', {}) + position_cap = global_config.get('position_cap_pct', 0) + + if position_cap <= 0: + results.append(ValidationResult( + rule_name="Fatal: Zero Position Sizing", + category=ValidationCategory.POSITION_SIZING, + severity=ValidationSeverity.FATAL, + passed=False, + message="Position cap is zero or negative - trading is impossible", + details="This configuration would prevent all trading operations", + affected_params=['global.position_cap_pct'], + recommended_action="Set position_cap_pct to a positive value (recommended: 1.0-5.0%)", + risk_score=10.0 + )) + + # Inverted stop loss logic + strategy_config = config.get('strategy', {}) + stop_loss = strategy_config.get('stop_loss', {}) + max_loss_pct = stop_loss.get('max_loss_pct', 0) + + if max_loss_pct <= 0: + results.append(ValidationResult( + rule_name="Fatal: No Stop Loss Protection", + category=ValidationCategory.RISK_MANAGEMENT, + severity=ValidationSeverity.FATAL, + passed=False, + message="Stop loss is disabled - unlimited loss potential", + details="Without stop losses, positions can lose 100% of capital", + affected_params=['strategy.stop_loss.max_loss_pct'], + recommended_action="Enable stop losses with max_loss_pct between 1-10%", + risk_score=10.0 + )) + + # Suicidal position sizing + if position_cap > 80.0: + results.append(ValidationResult( + rule_name="Fatal: Suicidal Position Sizing", + category=ValidationCategory.POSITION_SIZING, + severity=ValidationSeverity.FATAL, + passed=False, + message=f"Position cap {position_cap}% exceeds safe limits", + details="Single positions above 80% of equity can cause total account loss", + affected_params=['global.position_cap_pct'], + recommended_action="Reduce position_cap_pct to maximum 20% for safety", + risk_score=9.5 + )) + + return results + + def _validate_critical_risk_management(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Critical risk management validation with context awareness""" + results = [] + + global_config = config.get('global', {}) + risk_config = config.get('risk_management', {}) + + # Dynamic drawdown limits based on market conditions + daily_drawdown = global_config.get('daily_drawdown_pct', 0) + volatility_adjusted_limit = self._calculate_volatility_adjusted_drawdown_limit() + + if daily_drawdown > volatility_adjusted_limit: + results.append(ValidationResult( + rule_name="Context-Aware Drawdown Validation", + category=ValidationCategory.RISK_MANAGEMENT, + severity=ValidationSeverity.CRITICAL, + passed=False, + message=f"Drawdown limit {daily_drawdown}% exceeds volatility-adjusted safe limit of {volatility_adjusted_limit:.1f}%", + details=f"Current market volatility: {self.context.market_volatility:.2f}", + affected_params=['global.daily_drawdown_pct'], + recommended_action=f"Reduce to {volatility_adjusted_limit:.1f}% or lower for current market conditions", + risk_score=8.0, + context_sensitive=True + )) + + # Portfolio correlation intelligence + correlation_limit = risk_config.get('correlation_limit', 1.0) + max_positions = risk_config.get('max_concurrent_positions', 1) + + # If many positions are allowed, correlation limits become critical + if max_positions > 5 and correlation_limit > 0.7: + results.append(ValidationResult( + rule_name="Correlation-Position Size Mismatch", + category=ValidationCategory.RISK_MANAGEMENT, + severity=ValidationSeverity.CRITICAL, + passed=False, + message=f"High position count ({max_positions}) with loose correlation limit ({correlation_limit})", + details="Many correlated positions can move together during market stress", + affected_params=['risk_management.correlation_limit', 'risk_management.max_concurrent_positions'], + recommended_action="Reduce correlation_limit to 0.5 or limit positions to 3-5", + risk_score=7.5 + )) + + return results + + def _validate_position_sizing_intelligence(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Intelligent position sizing validation with behavioral analysis""" + results = [] + + global_config = config.get('global', {}) + risk_config = config.get('risk_management', {}) + + position_cap = global_config.get('position_cap_pct', 0) + portfolio_cap = global_config.get('portfolio_cap_pct', 0) + kelly_cap = risk_config.get('kelly_fraction_cap', 0.25) + + # Kelly fraction vs position size coherence + theoretical_max_kelly_position = kelly_cap * 100 # Convert to percentage + + if position_cap > theoretical_max_kelly_position * 2: + results.append(ValidationResult( + rule_name="Kelly-Position Size Incoherence", + category=ValidationCategory.POSITION_SIZING, + severity=ValidationSeverity.ERROR, + passed=False, + message=f"Position cap {position_cap}% exceeds 2x Kelly-optimal size {theoretical_max_kelly_position:.1f}%", + details="This configuration ignores mathematical position sizing principles", + affected_params=['global.position_cap_pct', 'risk_management.kelly_fraction_cap'], + recommended_action=f"Align position cap with Kelly fraction: max {theoretical_max_kelly_position:.1f}%", + risk_score=6.0 + )) + + # Portfolio heat analysis + max_positions = risk_config.get('max_concurrent_positions', 1) + theoretical_max_exposure = position_cap * max_positions + + if theoretical_max_exposure > portfolio_cap * 1.5: + results.append(ValidationResult( + rule_name="Portfolio Heat Overload", + category=ValidationCategory.POSITION_SIZING, + severity=ValidationSeverity.ERROR, + passed=False, + message=f"Max theoretical exposure {theoretical_max_exposure:.1f}% exceeds portfolio cap {portfolio_cap}%", + details=f"With {max_positions} positions at {position_cap}% each, portfolio can be overexposed", + affected_params=['global.position_cap_pct', 'global.portfolio_cap_pct', 'risk_management.max_concurrent_positions'], + recommended_action="Reduce position sizes or limit concurrent positions", + risk_score=7.0 + )) + + return results + + def _validate_market_risk_adaptation(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Market-adaptive risk validation""" + results = [] + + safety_config = config.get('safety', {}) + pre_trade_checks = safety_config.get('pre_trade_checks', {}) + + # Volatility-aware trading limits + max_volatility = pre_trade_checks.get('max_volatility_pct', 100.0) + current_vol = self.context.market_volatility * 100 # Convert to percentage + + if max_volatility > current_vol * 3 and current_vol > 0.2: # If allowing 3x current volatility + results.append(ValidationResult( + rule_name="Excessive Volatility Tolerance", + category=ValidationCategory.MARKET_CONDITIONS, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"Max volatility {max_volatility}% is 3x higher than current market volatility {current_vol:.1f}%", + details="System may trade in extremely volatile conditions", + affected_params=['safety.pre_trade_checks.max_volatility_pct'], + recommended_action=f"Consider reducing to {current_vol * 2:.1f}% for safer operation", + risk_score=4.0, + context_sensitive=True + )) + + # Liquidity requirements vs market conditions + min_liquidity = pre_trade_checks.get('min_liquidity_usd', 0) + if min_liquidity < 100000: # Less than $100k daily volume + results.append(ValidationResult( + rule_name="Insufficient Liquidity Requirements", + category=ValidationCategory.MARKET_CONDITIONS, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"Minimum liquidity ${min_liquidity:,.0f} may be insufficient", + details="Low liquidity can cause high slippage and poor execution", + affected_params=['safety.pre_trade_checks.min_liquidity_usd'], + recommended_action="Increase minimum liquidity to $500,000 or higher", + risk_score=5.0 + )) + + return results + + def _validate_execution_safety_measures(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Advanced execution safety validation""" + results = [] + + global_config = config.get('global', {}) + exchange_config = config.get('exchange', {}) + + # Slippage vs spread coherence + max_spread = global_config.get('max_spread_pct', 0) + slippage_cap = global_config.get('slippage_cap_pct', 0) + + if slippage_cap > max_spread * 2: + results.append(ValidationResult( + rule_name="Slippage-Spread Incoherence", + category=ValidationCategory.EXECUTION_SAFETY, + severity=ValidationSeverity.ERROR, + passed=False, + message=f"Slippage cap {slippage_cap}% exceeds 2x spread limit {max_spread}%", + details="High slippage tolerance relative to spread indicates poor execution planning", + affected_params=['global.max_spread_pct', 'global.slippage_cap_pct'], + recommended_action=f"Align slippage cap to max {max_spread * 1.5:.2f}%", + risk_score=5.5 + )) + + # Rate limiting intelligence + rate_limit = exchange_config.get('rate_limit_per_min', 0) + max_positions = config.get('risk_management', {}).get('max_concurrent_positions', 1) + + # If many positions but low rate limits, system may be throttled + required_rate = max_positions * 10 # Rough estimate: 10 API calls per position management + if rate_limit < required_rate and rate_limit > 0: + results.append(ValidationResult( + rule_name="Rate Limit vs Position Count Mismatch", + category=ValidationCategory.EXECUTION_SAFETY, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"Rate limit {rate_limit}/min may be insufficient for {max_positions} positions", + details=f"Estimated requirement: {required_rate} calls/min for proper position management", + affected_params=['exchange.rate_limit_per_min', 'risk_management.max_concurrent_positions'], + recommended_action=f"Increase rate limit to {required_rate} or reduce position count", + risk_score=3.0 + )) + + return results + + def _validate_slippage_and_spread_protection(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Advanced slippage and spread validation""" + results = [] + + global_config = config.get('global', {}) + max_spread = global_config.get('max_spread_pct', 0) + slippage_cap = global_config.get('slippage_cap_pct', 0) + + # Network latency impact on slippage + if self.context.network_latency_ms > 200 and slippage_cap < 0.5: + results.append(ValidationResult( + rule_name="Latency-Slippage Mismatch", + category=ValidationCategory.EXECUTION_SAFETY, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"High network latency ({self.context.network_latency_ms:.0f}ms) but tight slippage cap {slippage_cap}%", + details="High latency increases slippage risk, may cause execution failures", + affected_params=['global.slippage_cap_pct'], + recommended_action=f"Consider increasing slippage cap to 1.0% for high-latency conditions", + risk_score=4.0, + context_sensitive=True + )) + + return results + + def _validate_rate_limiting_intelligence(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Intelligent rate limiting validation""" + results = [] + + exchange_config = config.get('exchange', {}) + ai_config = config.get('ai_planner', {}) + + rate_limit = exchange_config.get('rate_limit_per_min', 1200) + analysis_interval = config.get('system', {}).get('analysis_interval_seconds', 300) + + # Calculate API usage intensity + if analysis_interval > 0: + cycles_per_hour = 3600 / analysis_interval + api_calls_per_cycle = 5 # Conservative estimate + hourly_api_usage = cycles_per_hour * api_calls_per_cycle + + if hourly_api_usage > rate_limit * 0.8: # Using >80% of rate limit + results.append(ValidationResult( + rule_name="API Rate Limit Saturation Risk", + category=ValidationCategory.EXECUTION_SAFETY, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"Estimated API usage {hourly_api_usage:.0f}/hr approaches rate limit {rate_limit}/hr", + details=f"Analysis interval {analysis_interval}s may cause rate limit issues", + affected_params=['exchange.rate_limit_per_min', 'system.analysis_interval_seconds'], + recommended_action="Increase analysis interval or request higher rate limits", + risk_score=3.5 + )) + + return results + + def _validate_ai_decision_boundaries(self, config: Dict[str, Any]) -> List[ValidationResult]: + """AI decision boundary validation""" + results = [] + + ai_config = config.get('ai_planner', {}) + budget_cap = ai_config.get('budget_cap_pct', 0) + confidence_threshold = ai_config.get('confidence_threshold', 0.7) + mode = ai_config.get('mode', 'manual_only') + + # AI autonomy vs confidence requirements + if mode in ['auto', 'recommend_then_auto'] and confidence_threshold < 0.6: + results.append(ValidationResult( + rule_name="Low Confidence Autonomous Trading", + category=ValidationCategory.AI_BEHAVIOR, + severity=ValidationSeverity.CRITICAL, + passed=False, + message=f"Autonomous mode with low confidence threshold {confidence_threshold}", + details="AI may make poor quality trades with low confidence requirements", + affected_params=['ai_planner.confidence_threshold', 'ai_planner.mode'], + recommended_action="Increase confidence threshold to 0.65+ for autonomous operation", + risk_score=7.0 + )) + + # Budget cap vs system performance + if budget_cap > 50.0 and len(self.context.recent_performance) > 0: + avg_performance = statistics.mean(self.context.recent_performance) + if avg_performance < 0: # Negative recent performance + results.append(ValidationResult( + rule_name="High AI Budget Despite Poor Performance", + category=ValidationCategory.AI_BEHAVIOR, + severity=ValidationSeverity.ERROR, + passed=False, + message=f"AI controls {budget_cap}% of portfolio despite negative recent performance", + details=f"Recent average performance: {avg_performance:.2f}%", + affected_params=['ai_planner.budget_cap_pct'], + recommended_action="Reduce AI budget cap during poor performance periods", + risk_score=6.5, + context_sensitive=True + )) + + return results + + def _validate_strategy_coherence(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Strategy coherence and parameter validation""" + results = [] + + strategy_config = config.get('strategy', {}) + params = strategy_config.get('params', {}) + bounds = strategy_config.get('bounds', {}) + + # Parameter bounds coherence + for param_name, param_value in params.items(): + if param_name in bounds: + bound_info = bounds[param_name] + min_val = bound_info.get('min') + max_val = bound_info.get('max') + + if min_val is not None and param_value < min_val: + results.append(ValidationResult( + rule_name=f"Parameter Below Bounds: {param_name}", + category=ValidationCategory.AI_BEHAVIOR, + severity=ValidationSeverity.ERROR, + passed=False, + message=f"Parameter {param_name}={param_value} below minimum {min_val}", + affected_params=[f'strategy.params.{param_name}'], + recommended_action=f"Set {param_name} to at least {min_val}", + risk_score=5.0 + )) + + if max_val is not None and param_value > max_val: + results.append(ValidationResult( + rule_name=f"Parameter Above Bounds: {param_name}", + category=ValidationCategory.AI_BEHAVIOR, + severity=ValidationSeverity.ERROR, + passed=False, + message=f"Parameter {param_name}={param_value} above maximum {max_val}", + affected_params=[f'strategy.params.{param_name}'], + recommended_action=f"Set {param_name} to at most {max_val}", + risk_score=5.0 + )) + + return results + + def _validate_learning_parameters(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Learning and adaptation parameter validation""" + results = [] + + strategy_config = config.get('strategy', {}) + cooldowns = strategy_config.get('cooldowns', {}) + + loss_streak = cooldowns.get('loss_streak', 3) + cooldown_minutes = cooldowns.get('cooldown_minutes', 60) + + # Cooldown logic validation + if loss_streak <= 1 and cooldown_minutes > 0: + results.append(ValidationResult( + rule_name="Overly Sensitive Cooldown", + category=ValidationCategory.AI_BEHAVIOR, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"Cooldown triggers after only {loss_streak} loss(es)", + details="Single losses may trigger unnecessary trading pauses", + affected_params=['strategy.cooldowns.loss_streak'], + recommended_action="Set loss_streak to 2-3 for better balance", + risk_score=2.0 + )) + + if loss_streak > 10: + results.append(ValidationResult( + rule_name="Insensitive Loss Streak Detection", + category=ValidationCategory.AI_BEHAVIOR, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"Cooldown requires {loss_streak} consecutive losses", + details="System may continue poor strategies too long", + affected_params=['strategy.cooldowns.loss_streak'], + recommended_action="Reduce loss_streak to 3-5 for better risk control", + risk_score=3.0 + )) + + return results + + def _validate_system_resource_limits(self, config: Dict[str, Any]) -> List[ValidationResult]: + """System resource and operational limits validation""" + results = [] + + system_config = config.get('system', {}) + max_log_size = system_config.get('max_log_size_mb', 100) + log_retention_days = system_config.get('log_retention_days', 30) + + # Log storage requirements + estimated_daily_logs = 10 # MB per day estimate + total_log_storage = estimated_daily_logs * log_retention_days + + if total_log_storage > max_log_size * 10: # More than 10 log files worth + results.append(ValidationResult( + rule_name="Insufficient Log Storage Planning", + category=ValidationCategory.SYSTEM_INTEGRITY, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"Log retention ({log_retention_days} days) may exceed storage limits", + details=f"Estimated storage need: {total_log_storage}MB vs max file size: {max_log_size}MB", + affected_params=['system.max_log_size_mb', 'system.log_retention_days'], + recommended_action="Increase max_log_size_mb or reduce retention period", + risk_score=2.0 + )) + + return results + + def _validate_data_integrity_checks(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Data integrity and audit validation""" + results = [] + + system_config = config.get('system', {}) + enable_audit_trail = system_config.get('enable_audit_trail', False) + + # Critical systems should have audit trails + ai_config = config.get('ai_planner', {}) + budget_cap = ai_config.get('budget_cap_pct', 0) + + if budget_cap > 25.0 and not enable_audit_trail: + results.append(ValidationResult( + rule_name="Missing Audit Trail for High-Risk AI", + category=ValidationCategory.REGULATORY_COMPLIANCE, + severity=ValidationSeverity.ERROR, + passed=False, + message=f"AI controls {budget_cap}% of portfolio but audit trail is disabled", + details="High-autonomy systems require comprehensive audit trails", + affected_params=['system.enable_audit_trail', 'ai_planner.budget_cap_pct'], + recommended_action="Enable audit trail for AI systems controlling >25% of portfolio", + risk_score=6.0 + )) + + return results + + def _validate_recovery_mechanisms(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Recovery and failsafe mechanism validation""" + results = [] + + safety_config = config.get('safety', {}) + enable_kill_switch = safety_config.get('enable_kill_switch', False) + + # High-risk systems must have kill switches + global_config = config.get('global', {}) + position_cap = global_config.get('position_cap_pct', 0) + portfolio_cap = global_config.get('portfolio_cap_pct', 0) + + total_risk_exposure = min(position_cap * 10, portfolio_cap) # Rough risk estimate + + if total_risk_exposure > 50.0 and not enable_kill_switch: + results.append(ValidationResult( + rule_name="Missing Kill Switch for High-Risk System", + category=ValidationCategory.SYSTEM_INTEGRITY, + severity=ValidationSeverity.CRITICAL, + passed=False, + message=f"High risk exposure (~{total_risk_exposure:.0f}%) but no kill switch", + details="High-risk trading systems require emergency stop mechanisms", + affected_params=['safety.enable_kill_switch'], + recommended_action="Enable kill switch for high-risk configurations", + risk_score=8.0 + )) + + return results + + def _validate_behavioral_anomalies(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Behavioral anomaly detection using historical patterns""" + results = [] + + # Analyze configuration changes + if len(self.config_change_frequency) > 5: + recent_changes = len([1 for change_time in self.config_change_frequency + if change_time > datetime.now() - timedelta(hours=24)]) + + if recent_changes > 3: # More than 3 config changes in 24 hours + results.append(ValidationResult( + rule_name="Excessive Configuration Changes", + category=ValidationCategory.SYSTEM_INTEGRITY, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"{recent_changes} configuration changes in last 24 hours", + details="Frequent config changes may indicate instability or panic adjustments", + recommended_action="Allow configuration to stabilize before further changes", + risk_score=4.0, + context_sensitive=True + )) + + # Check for parameter drift from baselines + self._check_parameter_drift(config, results) + + return results + + def _validate_configuration_stability(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Configuration stability and consistency validation""" + results = [] + + # Check for internal parameter conflicts + conflicts = self._detect_parameter_conflicts(config) + + for conflict in conflicts: + results.append(ValidationResult( + rule_name=f"Parameter Conflict: {conflict['name']}", + category=ValidationCategory.SYSTEM_INTEGRITY, + severity=ValidationSeverity.ERROR, + passed=False, + message=conflict['message'], + details=conflict['details'], + affected_params=conflict['params'], + recommended_action=conflict['solution'], + risk_score=5.5 + )) + + return results + + def _validate_regulatory_compliance(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Regulatory compliance validation""" + results = [] + + # Position size limits for retail traders + global_config = config.get('global', {}) + position_cap = global_config.get('position_cap_pct', 0) + + if position_cap > 20.0: # Above typical retail limits + results.append(ValidationResult( + rule_name="Retail Position Size Limits", + category=ValidationCategory.REGULATORY_COMPLIANCE, + severity=ValidationSeverity.WARNING, + passed=False, + message=f"Position size {position_cap}% exceeds typical retail trading limits", + details="Large position sizes may trigger regulatory attention", + affected_params=['global.position_cap_pct'], + recommended_action="Consider reducing position sizes for regulatory compliance", + risk_score=3.0 + )) + + return results + + def _validate_audit_trail_completeness(self, config: Dict[str, Any]) -> List[ValidationResult]: + """Audit trail completeness validation""" + results = [] + + system_config = config.get('system', {}) + log_level = system_config.get('log_level', 'INFO') + enable_audit_trail = system_config.get('enable_audit_trail', False) + + # Audit requirements for automated systems + ai_config = config.get('ai_planner', {}) + if ai_config.get('mode') in ['auto', 'recommend_then_auto'] and not enable_audit_trail: + results.append(ValidationResult( + rule_name="Insufficient Audit Trail for Automated Trading", + category=ValidationCategory.REGULATORY_COMPLIANCE, + severity=ValidationSeverity.ERROR, + passed=False, + message="Automated trading without comprehensive audit trail", + details="Regulatory compliance may require detailed trade logging", + affected_params=['system.enable_audit_trail', 'ai_planner.mode'], + recommended_action="Enable audit trail for automated trading systems", + risk_score=5.0 + )) + + return results + + # Helper methods + + def _calculate_config_hash(self, config: Dict[str, Any]) -> str: + """Calculate configuration hash for change detection""" + config_str = json.dumps(config, sort_keys=True, default=str) + return hashlib.sha256(config_str.encode()).hexdigest()[:16] + + def _track_config_changes(self, config_hash: str): + """Track configuration change frequency""" + if self.last_config_hash and self.last_config_hash != config_hash: + self.config_change_frequency.append(datetime.now()) + # Keep only last 50 changes + if len(self.config_change_frequency) > 50: + self.config_change_frequency.pop(0) + self.last_config_hash = config_hash + + def _calculate_volatility_adjusted_drawdown_limit(self) -> float: + """Calculate volatility-adjusted safe drawdown limit""" + base_limit = 5.0 # 5% base daily drawdown limit + volatility_factor = max(0.5, min(2.0, 1.0 + self.context.market_volatility)) + return base_limit / volatility_factor + + def _calculate_aggregate_risk(self) -> Dict[str, Any]: + """Calculate aggregate risk assessment""" + total_risk_score = sum(r.risk_score for r in self.validation_results if not r.passed) + risk_count_by_severity = {} + + for severity in ValidationSeverity: + count = len([r for r in self.validation_results + if r.severity == severity and not r.passed]) + risk_count_by_severity[severity.name] = count + + return { + 'total_risk_score': total_risk_score, + 'risk_distribution': risk_count_by_severity, + 'highest_risk_categories': self._get_highest_risk_categories(), + 'context_sensitive_issues': len([r for r in self.validation_results + if r.context_sensitive and not r.passed]) + } + + def _get_highest_risk_categories(self) -> List[str]: + """Get categories with highest risk scores""" + category_risk = {} + for result in self.validation_results: + if not result.passed: + category = result.category.value + category_risk[category] = category_risk.get(category, 0) + result.risk_score + + return sorted(category_risk.keys(), key=lambda x: category_risk[x], reverse=True)[:3] + + def _check_parameter_drift(self, config: Dict[str, Any], results: List[ValidationResult]): + """Check for unusual parameter drift from historical baselines""" + # This would be implemented with actual historical data + # For now, just a placeholder structure + pass + + def _detect_parameter_conflicts(self, config: Dict[str, Any]) -> List[Dict[str, Any]]: + """Detect internal parameter conflicts""" + conflicts = [] + + # Example: Position size vs portfolio cap conflict + global_config = config.get('global', {}) + risk_config = config.get('risk_management', {}) + + position_cap = global_config.get('position_cap_pct', 0) + portfolio_cap = global_config.get('portfolio_cap_pct', 0) + max_positions = risk_config.get('max_concurrent_positions', 1) + + if position_cap * max_positions > portfolio_cap * 1.2: # 20% buffer + conflicts.append({ + 'name': 'Position-Portfolio Capacity', + 'message': 'Position sizing allows overexposure beyond portfolio cap', + 'details': f'Max exposure: {position_cap * max_positions:.1f}% vs Portfolio cap: {portfolio_cap}%', + 'params': ['global.position_cap_pct', 'global.portfolio_cap_pct', 'risk_management.max_concurrent_positions'], + 'solution': 'Reduce position sizes or limit concurrent positions' + }) + + return conflicts + + def _log_validation_summary(self, is_safe: bool, fatal_failures: List, critical_failures: List): + """Log comprehensive validation summary""" + if not is_safe: + self.logger.error("=== CONFIGURATION SAFETY VALIDATION FAILED ===") + + if fatal_failures: + self.logger.error(f"FATAL ERRORS ({len(fatal_failures)}):") + for failure in fatal_failures: + self.logger.error(f" - {failure.message}") + + if critical_failures: + self.logger.error(f"CRITICAL ERRORS ({len(critical_failures)}):") + for failure in critical_failures: + self.logger.error(f" - {failure.message}") + else: + warning_count = len([r for r in self.validation_results + if r.severity == ValidationSeverity.WARNING and not r.passed]) + self.logger.info(f"Configuration validation PASSED with {warning_count} warnings") \ No newline at end of file diff --git a/core/pretrade_checklist.py b/core/pretrade_checklist.py new file mode 100644 index 0000000..d752cb1 --- /dev/null +++ b/core/pretrade_checklist.py @@ -0,0 +1,110 @@ +"""Pre-trade Checklist - Mandatory safety validation""" + +import logging +from typing import Dict, Any, Optional, List +from dataclasses import dataclass, field +from datetime import datetime + +@dataclass +class PreTradeChecklistResult: + """Result of pre-trade validation""" + passed: bool + failure_reasons: List[str] = field(default_factory=list) + checks_performed: int = 0 + checks_passed: int = 0 + + +class PreTradeChecker: + """Pre-trade safety checklist validator""" + + def __init__(self, config: Dict[str, Any], market_data_provider, portfolio_manager): + self.config = config + self.market_data = market_data_provider + self.portfolio_manager = portfolio_manager + self.logger = logging.getLogger(__name__) + + def validate_trade(self, trade_request: Dict[str, Any]) -> PreTradeChecklistResult: + """Validate trade against safety checklist""" + checks_performed = 0 + checks_passed = 0 + failure_reasons = [] + + symbol = trade_request.get('symbol') + quantity = trade_request.get('quantity', 0) + side = trade_request.get('side') + price = self.market_data.get_price(symbol) + + # 1. Basic validation + checks_performed += 1 + if not all([symbol, quantity > 0, side, price]): + failure_reasons.append("Basic trade information missing or invalid") + else: + checks_passed += 1 + + # 2. Market tradable + checks_performed += 1 + market_info = self.market_data.get_market_info(symbol) + if not market_info or not market_info.get('tradable'): + failure_reasons.append(f"Market {symbol} is not tradable") + else: + checks_passed += 1 + + # 3. Bid-ask spread + checks_performed += 1 + max_spread = self.config.get('global', {}).get('max_spread_pct', 0.5) + spread = self.market_data.get_spread(symbol) + if spread is not None and spread > max_spread: + failure_reasons.append(f"Spread for {symbol} ({spread:.2f}%) exceeds limit ({max_spread:.2f}%)") + else: + checks_passed += 1 + + # 4. Volatility + checks_performed += 1 + max_volatility = self.config.get('safety', {}).get('pre_trade_checks', {}).get('max_volatility_pct', 20.0) + volatility = self.market_data.get_volatility(symbol) + if volatility is not None and volatility > max_volatility: + failure_reasons.append(f"Volatility for {symbol} ({volatility:.2f}%) exceeds limit ({max_volatility:.2f}%)") + else: + checks_passed += 1 + + # 5. Slippage + checks_performed += 1 + slippage_cap = self.config.get('global', {}).get('slippage_cap_pct', 1.0) + estimated_slippage = self.market_data.get_estimated_slippage(symbol, quantity) + if estimated_slippage is not None and estimated_slippage > slippage_cap: + failure_reasons.append(f"Estimated slippage for {symbol} ({estimated_slippage:.2f}%) exceeds limit ({slippage_cap:.2f}%)") + else: + checks_passed += 1 + + # 6. Position sizing and portfolio exposure + checks_performed += 1 + position_cap_pct = self.config.get('global', {}).get('position_cap_pct', 2.0) + portfolio_cap_pct = self.config.get('global', {}).get('portfolio_cap_pct', 25.0) + trade_value = quantity * price + portfolio_value = self.portfolio_manager.get_total_portfolio_value() + + if portfolio_value > 0: + if (trade_value / portfolio_value) * 100 > position_cap_pct: + failure_reasons.append(f"Trade value exceeds position cap of {position_cap_pct}%") + else: + current_exposure = self.portfolio_manager.get_current_exposure() + if (current_exposure + trade_value) / portfolio_value * 100 > portfolio_cap_pct: + failure_reasons.append(f"Trade would exceed portfolio exposure cap of {portfolio_cap_pct}%") + else: + checks_passed += 1 + else: + checks_passed += 1 # Cannot check exposure if portfolio value is 0 + + # 7. Data staleness + checks_performed += 1 + last_update_time = self.market_data.get_last_update_time(symbol) + if last_update_time and (datetime.now() - last_update_time).total_seconds() > 300: + failure_reasons.append(f"Market data for {symbol} is stale") + else: + checks_passed += 1 + + passed = len(failure_reasons) == 0 + if not passed: + self.logger.warning(f"Pre-trade check failed for {symbol}: {failure_reasons}") + + return PreTradeChecklistResult(passed, failure_reasons, checks_performed, checks_passed) diff --git a/core/profit_protection.py b/core/profit_protection.py new file mode 100644 index 0000000..d547593 --- /dev/null +++ b/core/profit_protection.py @@ -0,0 +1,48 @@ +"""Profit Protection Manager - Trailing stops to lock in profits.""" + +import logging +from typing import Dict, Any + +class ProfitProtectionManager: + def __init__(self, config: Dict[str, Any], trade_executor): + self.config = config.get('safety', {}).get('profit_protection', {}) + self.trade_executor = trade_executor + self.logger = logging.getLogger(__name__) + self.is_running = False + self.trailing_stops: Dict[str, float] = {} + + def start(self) -> bool: + self.is_running = True + self.logger.info("Profit Protection Manager started.") + return True + + def stop(self) -> None: + self.is_running = False + self.logger.info("Profit Protection Manager stopped.") + + def update_position(self, symbol: str, quantity: float, entry_price: float, current_price: float) -> None: + """Update position and check for trailing stop adjustments.""" + if not self.is_running or not self.config.get('enabled'): + return + + pnl_percent = ((current_price - entry_price) / entry_price) * 100 + lock_threshold = self.config.get('lock_threshold_pct', 5.0) + trail_distance = self.config.get('trail_distance_pct', 2.0) + + # Activate trailing stop if profit threshold is met + if symbol not in self.trailing_stops and pnl_percent >= lock_threshold: + self.trailing_stops[symbol] = current_price * (1 - (trail_distance / 100)) + self.logger.info(f"Profit protection activated for {symbol} at ${self.trailing_stops[symbol]:.2f}") + + # Update trailing stop if active and price increases + if symbol in self.trailing_stops: + new_stop = current_price * (1 - (trail_distance / 100)) + if new_stop > self.trailing_stops[symbol]: + self.trailing_stops[symbol] = new_stop + self.logger.info(f"Trailing stop for {symbol} updated to ${new_stop:.2f}") + + # Check if trailing stop is hit + if current_price <= self.trailing_stops[symbol]: + self.logger.info(f"Profit protection triggered for {symbol} at ${current_price:.2f}. Selling position.") + self.trade_executor.execute_trade(symbol, 'sell', quantity) + del self.trailing_stops[symbol] diff --git a/core/support_bundle.py b/core/support_bundle.py new file mode 100644 index 0000000..64d9a37 --- /dev/null +++ b/core/support_bundle.py @@ -0,0 +1,58 @@ +"""Support Bundle Generator - Stub Implementation""" + +import logging +import zipfile +from pathlib import Path +from datetime import datetime +import re + +def sanitize_config(config_content: str) -> str: + """Remove sensitive information from config file""" + # Remove API keys and secrets + sanitized_content = re.sub(r'api_key\s*=\s*".*"\n', 'api_key = "REDACTED"\n', config_content) + sanitized_content = re.sub(r'api_secret\s*=\s*".*"\n', 'api_secret = "REDACTED"\n', sanitized_content) + sanitized_content = re.sub(r'api_passphrase\s*=\s*".*"\n', 'api_passphrase = "REDACTED"\n', sanitized_content) + + # Remove Telegram and email credentials + sanitized_content = re.sub(r'telegram_bot_token\s*=\s*".*"\n', 'telegram_bot_token = "REDACTED"\n', sanitized_content) + sanitized_content = re.sub(r'smtp_password\s*=\s*".*"\n', 'smtp_password = "REDACTED"\n', sanitized_content) + + return sanitized_content + +def create_support_bundle() -> Path: + """Create support bundle for troubleshooting""" + logger = logging.getLogger(__name__) + + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + bundle_path = Path(f"support_bundle_{timestamp}.zip") + + try: + with zipfile.ZipFile(bundle_path, 'w') as zf: + # Add system info + zf.writestr("system_info.txt", "Crypto Collab Professional v1.0.0\nWindows System") + + # Add config (sanitized) + try: + config_path = Path("config/default.toml") + if config_path.exists(): + config_content = config_path.read_text() + sanitized_config = sanitize_config(config_content) + zf.writestr("config_sanitized.toml", sanitized_config) + except Exception as e: + logger.error(f"Could not add sanitized config to support bundle: {e}") + + # Add recent logs + log_dir = Path("logs") + if log_dir.exists(): + for log_file in log_dir.glob("*.log"): + try: + zf.write(log_file, f"logs/{log_file.name}") + except: + pass + + logger.info(f"Support bundle created: {bundle_path}") + return bundle_path + + except Exception as e: + logger.error(f"Failed to create support bundle: {e}") + raise