diff --git a/README.md b/README.md index b1a89b4..aec10f0 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,9 @@ pip install -r requirements.txt # Option 2: Install as package (recommended) pip install -e . +# Explore flags +python -m xposure --help + # Run a scan python -m xposure example.com @@ -65,6 +68,16 @@ python -m xposure example.com python -m xposure example.com -o results.json ``` +### 🎛️ Live Dashboard (Mr. Robot mode) + +The default run launches a neon console dashboard inspired by *Mr. Robot*: + +- Glitchy banner + status ticker +- Live telemetry (recon counts, candidate totals, verification results) +- Phase-aware updates as the engine moves from discovery → extraction → correlation → verification + +Prefer silent mode? Add `-q/--quiet` to stream minimal output, or `--no-verify` if you only want passive checks. + --- ## ✨ Features diff --git a/test_verification.py b/test_verification.py index f918302..9402a7f 100644 --- a/test_verification.py +++ b/test_verification.py @@ -112,12 +112,16 @@ def test_verifier_routing(): print(f"\nAll supported types: {coordinator.get_supported_types()}") -async def test_verification_structure(): +def test_verification_structure(): """Test verification structure with sample findings.""" print("\n" + "=" * 70) print("TEST: Verification Structure") print("=" * 70) + asyncio.run(_verify_structure()) + + +async def _verify_structure(): # Create sample findings findings = [ Finding( @@ -188,12 +192,16 @@ async def test_verification_structure(): print(f" Unverified: {stats['unverified']}") -async def test_aws_signature(): +def test_aws_signature(): """Test AWS signature generation (structure only).""" print("\n" + "=" * 70) print("TEST: AWS Signature Structure") print("=" * 70) + asyncio.run(_aws_signature()) + + +async def _aws_signature(): from xposure.verify.aws import AWSVerifier verifier = AWSVerifier() @@ -246,8 +254,8 @@ async def main(): """Run all tests.""" test_passive_verification() test_verifier_routing() - await test_verification_structure() - await test_aws_signature() + await _verify_structure() + await _aws_signature() test_github_token_types() print("\n" + "=" * 70) diff --git a/xposure/cli.py b/xposure/cli.py index 496c21b..ea6253c 100644 --- a/xposure/cli.py +++ b/xposure/cli.py @@ -18,7 +18,7 @@ def print_banner(compact: bool = False): print(banner) -@click.command() +@click.command(context_settings={"help_option_names": ["-h", "--help"]}) @click.argument('target', required=False) @click.option('--github-token', '-g', envvar='GITHUB_TOKEN', help='GitHub token for dorking') @click.option('--output', '-o', type=click.Path(), help='Output file (JSON)') diff --git a/xposure/core/engine.py b/xposure/core/engine.py index 8b1381a..28c4769 100644 --- a/xposure/core/engine.py +++ b/xposure/core/engine.py @@ -65,6 +65,7 @@ def __init__( # Store all candidates for correlation self.all_candidates = [] self.findings = [] + self.dashboard = None async def run_quiet(self): """Run scan in quiet mode (no live dashboard).""" @@ -90,12 +91,13 @@ async def run_with_dashboard(self): dashboard = LiveDashboard(self.config, self.state, self.stats) try: + self.dashboard = dashboard await dashboard.start() await self._run_scan() except KeyboardInterrupt: print("\n[x-posure] scan interrupted") finally: - dashboard.stop() + await dashboard.stop() self._finalize() async def _run_scan(self): @@ -104,20 +106,27 @@ async def _run_scan(self): print(f"[x-posure] scan_id: {self.scan_id}") # 1. Discovery Phase + self._update_dashboard("discovery", "Mapping the surface") discovered_content = await self._discovery_phase() # 2. Extraction Phase + self._update_dashboard("extraction", "Harvesting signals") await self._extraction_phase(discovered_content) # 3. Correlation Phase + self._update_dashboard("correlation", "Linking intel") await self._correlation_phase() # 4. Verification Phase if self.config.verify: + self._update_dashboard("verification", "Trust but verify") await self._verification_phase() + else: + self._update_dashboard("complete", "Verification skipped (user choice)") # Update stats self.stats.end_time = datetime.now() + self._update_dashboard("complete", "Scan finished") async def _discovery_phase(self) -> dict: """Run discovery modules to find attack surface.""" @@ -555,6 +564,7 @@ async def _correlation_phase(self): # 1. Deduplicate candidates into findings unique_findings = [] + context_scores: dict[str, float] = {} for candidate in self.all_candidates: finding, is_new = self.deduplicator.add_or_merge(candidate) @@ -564,6 +574,13 @@ async def _correlation_phase(self): # Track in graph self.graph.track_finding(finding) + # Track strongest context signal for this finding + context_score = self.scorer.analyze_snippet_context(candidate.context) + if finding.id not in context_scores: + context_scores[finding.id] = context_score + else: + context_scores[finding.id] = max(context_scores[finding.id], context_score) + if not self.config.quiet: print(f"[dedup] {len(self.all_candidates)} candidates -> {len(unique_findings)} unique findings") @@ -608,7 +625,7 @@ async def _correlation_phase(self): final_score = self.scorer.calculate_score( finding=finding, is_paired=is_paired, - context_quality=0.7, # Default context quality + context_quality=context_scores.get(finding.id, 0.7), ) # Update finding confidence @@ -745,3 +762,12 @@ def export_json(self, output_file: str): output_file: Path to output file """ self.state.export(Path(output_file)) + + def _update_dashboard(self, phase: str, detail: str = ""): + """Send phase updates to the live dashboard if enabled.""" + if self.dashboard: + try: + self.dashboard.set_phase(phase, detail) + except Exception: + # Dashboard issues shouldn't break the scan + pass diff --git a/xposure/core/models.py b/xposure/core/models.py index 800d20c..df851c2 100644 --- a/xposure/core/models.py +++ b/xposure/core/models.py @@ -59,6 +59,13 @@ class Candidate: entropy: float context: str # surrounding code confidence: float = 0.0 + severity: Optional[Severity] = None + rule_id: Optional[str] = None + rule_name: Optional[str] = None + metadata: dict = field(default_factory=dict) + verifier: Optional[str] = None + remediation: Optional[str] = None + position: Optional[int] = None paired_with: Optional['Candidate'] = None def to_dict(self) -> dict: @@ -70,6 +77,13 @@ def to_dict(self) -> dict: "entropy": self.entropy, "context": self.context, "confidence": self.confidence, + "severity": self.severity.value if isinstance(self.severity, Severity) else self.severity, + "rule_id": self.rule_id, + "rule_name": self.rule_name, + "metadata": self.metadata, + "verifier": self.verifier, + "remediation": self.remediation, + "position": self.position, "paired_with": self.paired_with.type if self.paired_with else None, } diff --git a/xposure/correlate/confidence.py b/xposure/correlate/confidence.py index 3dc15a8..c23b476 100644 --- a/xposure/correlate/confidence.py +++ b/xposure/correlate/confidence.py @@ -223,6 +223,46 @@ def analyze_context_quality(self, content: str, position: int, value: str) -> fl return max(0.0, min(1.0, score)) + def analyze_snippet_context(self, context: str) -> float: + """ + Score a small snippet of context without requiring exact positions. + + This is useful when only a trimmed context window is available (e.g. regex + matches). The scoring uses the same keyword heuristics as analyze_context_quality + but is resilient to missing positional information. + """ + if not context: + return 0.0 + + lowered = context.lower() + score = 0.4 # baseline for having any context + + positive_keywords = [ + 'key', 'token', 'secret', 'password', 'credential', + 'auth', 'api', 'access', 'private', 'config', + 'env', 'production', 'prod', 'live', 'client_id', + 'client_secret', 'aws', 'gcp', 'azure', 'slack', + 'stripe', 'github', 'gitlab' + ] + negative_keywords = [ + 'example', 'test', 'demo', 'sample', 'placeholder', + 'fake', 'mock', 'dummy', 'xxx', 'todo', 'spec' + ] + + positive_hits = sum(1 for kw in positive_keywords if kw in lowered) + negative_hits = sum(1 for kw in negative_keywords if kw in lowered) + + score += min(0.35, positive_hits * 0.05) + score -= min(0.35, negative_hits * 0.08) + + if any(token in lowered for token in ['=', ':', '->', '"', "'"]): + score += 0.1 + + if any(fragment.isupper() and len(fragment) > 3 for fragment in lowered.split()): + score += 0.05 + + return max(0.0, min(1.0, score)) + def get_confidence_level(self, score: float) -> str: """ Get human-readable confidence level. diff --git a/xposure/correlate/dedup.py b/xposure/correlate/dedup.py index 6c59f90..0eaae20 100644 --- a/xposure/correlate/dedup.py +++ b/xposure/correlate/dedup.py @@ -82,8 +82,25 @@ def _create_finding(self, candidate: Candidate) -> Finding: confidence=candidate.confidence, confidence_factors=[], entropy=candidate.entropy, + severity=candidate.severity, + metadata=candidate.metadata.copy() if candidate.metadata else {}, ) + # Preserve rule metadata and remediation guidance + if candidate.rule_id or candidate.rule_name: + finding.metadata.setdefault("rule", {}) + if candidate.rule_id: + finding.metadata["rule"]["id"] = candidate.rule_id + if candidate.rule_name: + finding.metadata["rule"]["name"] = candidate.rule_name + + if candidate.remediation: + finding.remediation = candidate.remediation + + if candidate.verifier: + finding.metadata.setdefault("verification", {}) + finding.metadata["verification"].setdefault("suggested_verifier", candidate.verifier) + return finding def _merge_candidate(self, finding: Finding, candidate: Candidate): @@ -105,6 +122,24 @@ def _merge_candidate(self, finding: Finding, candidate: Candidate): f"seen in {len(finding.sources)} sources" ) + # Merge severity if existing finding lacks it + if not finding.severity and candidate.severity: + finding.severity = candidate.severity + + # Merge metadata (rule/provider info) + if candidate.metadata: + finding.metadata.update({k: v for k, v in candidate.metadata.items() if k not in finding.metadata}) + + if candidate.rule_id or candidate.rule_name: + finding.metadata.setdefault("rule", {}) + if candidate.rule_id: + finding.metadata["rule"].setdefault("id", candidate.rule_id) + if candidate.rule_name: + finding.metadata["rule"].setdefault("name", candidate.rule_name) + + if candidate.remediation and not finding.remediation: + finding.remediation = candidate.remediation + def _mask_value(self, value: str, visible: int = 8) -> str: """ Mask credential value for safe display. diff --git a/xposure/extract/quick.py b/xposure/extract/quick.py index fea09e5..83d70c8 100644 --- a/xposure/extract/quick.py +++ b/xposure/extract/quick.py @@ -24,6 +24,12 @@ class QuickScanner: 'github_token': r'(?:ghp|gho|ghu|ghs|ghr)_[a-zA-Z0-9]{36,}', 'github_fine': r'github_pat_[a-zA-Z0-9]{22}_[a-zA-Z0-9]{59}', + # Cloud & providers + 'gcp_api_key': r'AIza[0-9A-Za-z\-_]{35}', + 'digitalocean_pat': r'dop_v1_[a-f0-9]{64}', + 'cloudflare_token': r'(?:CFP|CFU|cfp|cfu)[a-zA-Z0-9_-]{30,}', + 'supabase_service_key': r'sb[a-z]{2}_[a-zA-Z0-9]{40,}', + # Slack 'slack_token': r'xox[baprs]-[0-9]{10,13}-[0-9]{10,13}-[a-zA-Z0-9]{24,}', 'slack_webhook': r'https://hooks\.slack\.com/services/T[A-Z0-9]{8,}/B[A-Z0-9]{8,}/[a-zA-Z0-9]{24}', @@ -119,7 +125,8 @@ def scan(self, content: str, source: Source) -> Generator[Candidate, None, None] high_confidence_patterns = { 'github_token', 'github_fine', 'slack_token', 'stripe_key', 'openai_key', 'anthropic_key', - 'aws_access_key' + 'aws_access_key', 'gcp_api_key', 'digitalocean_pat', + 'cloudflare_token', 'supabase_service_key' } if entropy < self.min_entropy and pattern_name not in high_confidence_patterns: @@ -180,7 +187,9 @@ def _initial_confidence(self, pattern_name: str, entropy: float) -> float: high_confidence = { 'github_token', 'github_fine', 'slack_token', 'stripe_key', 'openai_key', 'anthropic_key', - 'aws_access_key', 'slack_webhook' + 'aws_access_key', 'slack_webhook', + 'gcp_api_key', 'digitalocean_pat', 'cloudflare_token', + 'supabase_service_key' } medium_confidence = { diff --git a/xposure/output/console.py b/xposure/output/console.py index 8ab6712..6f34ae7 100644 --- a/xposure/output/console.py +++ b/xposure/output/console.py @@ -1,6 +1,21 @@ """X-POSURE console output and live dashboard.""" -from typing import TYPE_CHECKING +import asyncio +from datetime import datetime +from typing import TYPE_CHECKING, Optional + +from rich import box +from rich.align import Align +from rich.console import Console +from rich.layout import Layout +from rich.live import Live +from rich.panel import Panel +from rich.table import Table +from rich.text import Text +from rich.theme import Theme + +from ..ui.colors import COLORS +from ..ui.banners import BANNER_COMPACT if TYPE_CHECKING: from ..config import Config @@ -9,25 +24,174 @@ class LiveDashboard: - """Live dashboard using Rich (placeholder for now).""" + """Rich-powered live dashboard with a neon Mr. Robot vibe.""" - def __init__(self, config: 'Config', state: 'ScanState', stats: 'ScanStats'): + def __init__(self, config: "Config", state: "ScanState", stats: "ScanStats"): """Initialize dashboard.""" self.config = config self.state = state self.stats = stats self.running = False + self.phase: str = "initializing" + self.phase_detail: str = "Booting up nodes..." + self.recent_events: list[str] = [] + self._live: Optional[Live] = None + self._refresh_task: Optional[asyncio.Task] = None + self._stop_event = asyncio.Event() + + theme = Theme( + { + "mrrobot.primary": COLORS["toxic"], + "mrrobot.alert": COLORS["blood"], + "mrrobot.dim": COLORS["smoke"], + "mrrobot.banner": "bold " + COLORS["toxic"], + } + ) + self.console = Console(theme=theme) async def start(self): """Start the live dashboard.""" self.running = True - # Will be implemented in session 7 with Rich + self._stop_event.clear() - def stop(self): + self._live = Live( + self._render(), + console=self.console, + refresh_per_second=6, + screen=True, + ) + self._live.start() + self._refresh_task = asyncio.create_task(self._refresh_loop()) + + async def stop(self): """Stop the live dashboard.""" self.running = False + self._stop_event.set() + if self._refresh_task: + await asyncio.wait({self._refresh_task}, return_when=asyncio.ALL_COMPLETED) + if self._live: + self._live.stop() + self._live = None + + def set_phase(self, phase: str, detail: str = ""): + """Update the current phase and detail message.""" + self.phase = phase + if detail: + self.phase_detail = detail + self._push_event(f"[{phase.upper()}] {detail or '...'}") + + def _push_event(self, message: str): + """Append an event to the ticker.""" + timestamp = datetime.now().strftime("%H:%M:%S") + self.recent_events.append(f"{timestamp} {message}") + self.recent_events = self.recent_events[-6:] + + async def _refresh_loop(self): + """Continuously refresh the dashboard while running.""" + while not self._stop_event.is_set(): + if self._live: + self._live.update(self._render()) + await asyncio.sleep(0.4) + + def _render(self) -> Layout: + """Build the Rich layout for the dashboard.""" + layout = Layout() + layout.split_column( + Layout(name="header", size=5), + Layout(name="body"), + Layout(name="footer", size=6), + ) + + layout["body"].split_row( + Layout(name="phase", ratio=2), + Layout(name="stats", ratio=3), + Layout(name="events", ratio=2), + ) + + layout["header"].update(self._render_header()) + layout["phase"].update(self._render_phase()) + layout["stats"].update(self._render_stats()) + layout["events"].update(self._render_events()) + layout["footer"].update(self._render_footer()) + + return layout + + def _render_header(self) -> Panel: + """Render the ASCII banner.""" + banner_text = Text.from_ansi(BANNER_COMPACT) + banner = Align.center(banner_text, vertical="middle") + return Panel( + banner, + box=box.SQUARE, + border_style="mrrobot.primary", + subtitle="live // infiltration dashboard", + ) + + def _render_phase(self) -> Panel: + """Render current phase info.""" + glitch = Text("▌ X-POSURE : OPERATION ▐", style="mrrobot.primary") + phase_text = Text(self.phase.upper(), style="bold " + COLORS["blood"]) + detail_text = Text(self.phase_detail or "Running...", style="mrrobot.dim") + + table = Table.grid(expand=True) + table.add_row(glitch) + table.add_row("") + table.add_row(Text("phase", style="mrrobot.dim"), phase_text) + table.add_row(Text("detail", style="mrrobot.dim"), detail_text) + table.add_row(Text("target", style="mrrobot.dim"), Text(self.config.target, style="mrrobot.primary")) + + return Panel( + table, + title="mr.robot // status", + border_style="mrrobot.primary", + box=box.ROUNDED, + ) + + def _render_stats(self) -> Panel: + """Render stats snapshot.""" + stats_table = Table.grid(padding=(0, 1)) + stats_table.add_column("metric", style="mrrobot.dim", justify="right") + stats_table.add_column("value", style="mrrobot.primary") + + stats_table.add_row("subdomains", str(self.stats.subdomains_found)) + stats_table.add_row("js files", str(self.stats.js_files_found)) + stats_table.add_row("paths", str(len(getattr(self.state, 'seen_urls', [])))) + stats_table.add_row("candidates", str(self.stats.candidates_found)) + stats_table.add_row("paired", str(self.stats.paired_credentials)) + stats_table.add_row("verified", str(self.stats.verified_findings)) + stats_table.add_row("errors", str(self.stats.error_findings)) + + return Panel( + stats_table, + title="signal // telemetry", + border_style="mrrobot.primary", + box=box.ROUNDED, + ) + + def _render_events(self) -> Panel: + """Render recent events ticker.""" + table = Table.grid(expand=True) + for event in reversed(self.recent_events): + table.add_row(Text(event, style="mrrobot.dim")) + + if not self.recent_events: + table.add_row(Text("listening for leaks...", style="mrrobot.dim")) + + return Panel( + table, + title="ticker // operations", + border_style="mrrobot.primary", + box=box.ROUNDED, + ) - def update(self): - """Update the dashboard display.""" - # Will be implemented in session 7 - pass + def _render_footer(self) -> Panel: + """Render footer with friendly reminder.""" + footer_text = Text( + "root@fsociety:~$ reality is insecure — exploit responsibly", + style="mrrobot.primary", + ) + return Panel( + Align.center(footer_text, vertical="middle"), + border_style="mrrobot.primary", + box=box.SQUARE, + ) diff --git a/xposure/rules/engine.py b/xposure/rules/engine.py index e35cd7c..ade43c1 100644 --- a/xposure/rules/engine.py +++ b/xposure/rules/engine.py @@ -2,7 +2,7 @@ from typing import Generator, Optional -from ..core.models import Candidate, Source +from ..core.models import Candidate, Source, Severity from .loader import Rule, RuleLoader @@ -44,6 +44,13 @@ def scan(self, content: str, source: Source) -> Generator[Candidate, None, None] entropy=entropy, context=match['context'], confidence=self._calculate_confidence(match, entropy), + severity=self._parse_severity(match.get('severity')), + rule_id=match.get('rule_id'), + rule_name=match.get('rule_name'), + metadata=match.get('metadata', {}), + verifier=match.get('verifier'), + remediation=match.get('remediation'), + position=match.get('start'), ) # Store additional metadata @@ -76,10 +83,26 @@ def scan_with_rule(self, content: str, rule: Rule, source: Source) -> Generator[ entropy=entropy, context=match['context'], confidence=self._calculate_confidence(match, entropy), + severity=self._parse_severity(match.get('severity')), + rule_id=match.get('rule_id'), + rule_name=match.get('rule_name'), + metadata=match.get('metadata', {}), + verifier=match.get('verifier'), + remediation=match.get('remediation'), + position=match.get('start'), ) yield candidate + def _parse_severity(self, severity: Optional[str]) -> Optional[Severity]: + """Convert severity string to enum when possible.""" + if not severity: + return None + try: + return Severity(severity) + except ValueError: + return None + def get_paired_rules(self, rule_id: str) -> list[Rule]: """ Get rules that should be paired with this rule. @@ -140,7 +163,10 @@ def _calculate_confidence(self, match: dict, entropy: float) -> float: 'info': 0.2, } - base = severity_scores.get(match['severity'], 0.5) + severity_key = match.get('severity') + if isinstance(severity_key, Severity): + severity_key = severity_key.value + base = severity_scores.get(severity_key, 0.5) # Adjust for entropy if entropy > 4.5: diff --git a/xposure/rules/loader.py b/xposure/rules/loader.py index b415f90..534f927 100644 --- a/xposure/rules/loader.py +++ b/xposure/rules/loader.py @@ -103,6 +103,7 @@ def match(self, content: str, context_window: int = 200) -> list[dict]: 'verifier': self.verifier, 'remediation': self.remediation, 'pair_with': self.pair_with, + 'capture_group': self.capture_group, }) return matches