Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,26 @@ pip install -r requirements.txt
# Option 2: Install as package (recommended)
pip install -e .

# Explore flags
python -m xposure --help

# Run a scan
python -m xposure example.com

# Save results to JSON
python -m xposure example.com -o results.json
```

### 🎛️ Live Dashboard (Mr. Robot mode)

The default run launches a neon console dashboard inspired by *Mr. Robot*:

- Glitchy banner + status ticker
- Live telemetry (recon counts, candidate totals, verification results)
- Phase-aware updates as the engine moves from discovery → extraction → correlation → verification

Prefer silent mode? Add `-q/--quiet` to stream minimal output, or `--no-verify` if you only want passive checks.

---

## ✨ Features
Expand Down
16 changes: 12 additions & 4 deletions test_verification.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ def test_verifier_routing():
print(f"\nAll supported types: {coordinator.get_supported_types()}")


async def test_verification_structure():
def test_verification_structure():
"""Test verification structure with sample findings."""
print("\n" + "=" * 70)
print("TEST: Verification Structure")
print("=" * 70)

asyncio.run(_verify_structure())


async def _verify_structure():
# Create sample findings
findings = [
Finding(
Expand Down Expand Up @@ -188,12 +192,16 @@ async def test_verification_structure():
print(f" Unverified: {stats['unverified']}")


async def test_aws_signature():
def test_aws_signature():
"""Test AWS signature generation (structure only)."""
print("\n" + "=" * 70)
print("TEST: AWS Signature Structure")
print("=" * 70)

asyncio.run(_aws_signature())


async def _aws_signature():
from xposure.verify.aws import AWSVerifier

verifier = AWSVerifier()
Expand Down Expand Up @@ -246,8 +254,8 @@ async def main():
"""Run all tests."""
test_passive_verification()
test_verifier_routing()
await test_verification_structure()
await test_aws_signature()
await _verify_structure()
await _aws_signature()
test_github_token_types()

print("\n" + "=" * 70)
Expand Down
2 changes: 1 addition & 1 deletion xposure/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ def print_banner(compact: bool = False):
print(banner)


@click.command()
@click.command(context_settings={"help_option_names": ["-h", "--help"]})
@click.argument('target', required=False)
@click.option('--github-token', '-g', envvar='GITHUB_TOKEN', help='GitHub token for dorking')
@click.option('--output', '-o', type=click.Path(), help='Output file (JSON)')
Expand Down
30 changes: 28 additions & 2 deletions xposure/core/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ def __init__(
# Store all candidates for correlation
self.all_candidates = []
self.findings = []
self.dashboard = None

async def run_quiet(self):
"""Run scan in quiet mode (no live dashboard)."""
Expand All @@ -90,12 +91,13 @@ async def run_with_dashboard(self):
dashboard = LiveDashboard(self.config, self.state, self.stats)

try:
self.dashboard = dashboard
await dashboard.start()
await self._run_scan()
except KeyboardInterrupt:
print("\n[x-posure] scan interrupted")
finally:
dashboard.stop()
await dashboard.stop()
self._finalize()

async def _run_scan(self):
Expand All @@ -104,20 +106,27 @@ async def _run_scan(self):
print(f"[x-posure] scan_id: {self.scan_id}")

# 1. Discovery Phase
self._update_dashboard("discovery", "Mapping the surface")
discovered_content = await self._discovery_phase()

# 2. Extraction Phase
self._update_dashboard("extraction", "Harvesting signals")
await self._extraction_phase(discovered_content)

# 3. Correlation Phase
self._update_dashboard("correlation", "Linking intel")
await self._correlation_phase()

# 4. Verification Phase
if self.config.verify:
self._update_dashboard("verification", "Trust but verify")
await self._verification_phase()
else:
self._update_dashboard("complete", "Verification skipped (user choice)")

# Update stats
self.stats.end_time = datetime.now()
self._update_dashboard("complete", "Scan finished")

async def _discovery_phase(self) -> dict:
"""Run discovery modules to find attack surface."""
Expand Down Expand Up @@ -555,6 +564,7 @@ async def _correlation_phase(self):

# 1. Deduplicate candidates into findings
unique_findings = []
context_scores: dict[str, float] = {}
for candidate in self.all_candidates:
finding, is_new = self.deduplicator.add_or_merge(candidate)

Expand All @@ -564,6 +574,13 @@ async def _correlation_phase(self):
# Track in graph
self.graph.track_finding(finding)

# Track strongest context signal for this finding
context_score = self.scorer.analyze_snippet_context(candidate.context)
if finding.id not in context_scores:
context_scores[finding.id] = context_score
else:
context_scores[finding.id] = max(context_scores[finding.id], context_score)

if not self.config.quiet:
print(f"[dedup] {len(self.all_candidates)} candidates -> {len(unique_findings)} unique findings")

Expand Down Expand Up @@ -608,7 +625,7 @@ async def _correlation_phase(self):
final_score = self.scorer.calculate_score(
finding=finding,
is_paired=is_paired,
context_quality=0.7, # Default context quality
context_quality=context_scores.get(finding.id, 0.7),
)

# Update finding confidence
Expand Down Expand Up @@ -745,3 +762,12 @@ def export_json(self, output_file: str):
output_file: Path to output file
"""
self.state.export(Path(output_file))

def _update_dashboard(self, phase: str, detail: str = ""):
"""Send phase updates to the live dashboard if enabled."""
if self.dashboard:
try:
self.dashboard.set_phase(phase, detail)
except Exception:
# Dashboard issues shouldn't break the scan
pass
14 changes: 14 additions & 0 deletions xposure/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,13 @@ class Candidate:
entropy: float
context: str # surrounding code
confidence: float = 0.0
severity: Optional[Severity] = None
rule_id: Optional[str] = None
rule_name: Optional[str] = None
metadata: dict = field(default_factory=dict)
verifier: Optional[str] = None
remediation: Optional[str] = None
position: Optional[int] = None
paired_with: Optional['Candidate'] = None

def to_dict(self) -> dict:
Expand All @@ -70,6 +77,13 @@ def to_dict(self) -> dict:
"entropy": self.entropy,
"context": self.context,
"confidence": self.confidence,
"severity": self.severity.value if isinstance(self.severity, Severity) else self.severity,
"rule_id": self.rule_id,
"rule_name": self.rule_name,
"metadata": self.metadata,
"verifier": self.verifier,
"remediation": self.remediation,
"position": self.position,
"paired_with": self.paired_with.type if self.paired_with else None,
}

Expand Down
40 changes: 40 additions & 0 deletions xposure/correlate/confidence.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,46 @@ def analyze_context_quality(self, content: str, position: int, value: str) -> fl

return max(0.0, min(1.0, score))

def analyze_snippet_context(self, context: str) -> float:
"""
Score a small snippet of context without requiring exact positions.

This is useful when only a trimmed context window is available (e.g. regex
matches). The scoring uses the same keyword heuristics as analyze_context_quality
but is resilient to missing positional information.
"""
if not context:
return 0.0

lowered = context.lower()
score = 0.4 # baseline for having any context

positive_keywords = [
'key', 'token', 'secret', 'password', 'credential',
'auth', 'api', 'access', 'private', 'config',
'env', 'production', 'prod', 'live', 'client_id',
'client_secret', 'aws', 'gcp', 'azure', 'slack',
'stripe', 'github', 'gitlab'
]
negative_keywords = [
'example', 'test', 'demo', 'sample', 'placeholder',
'fake', 'mock', 'dummy', 'xxx', 'todo', 'spec'
]

positive_hits = sum(1 for kw in positive_keywords if kw in lowered)
negative_hits = sum(1 for kw in negative_keywords if kw in lowered)

score += min(0.35, positive_hits * 0.05)
score -= min(0.35, negative_hits * 0.08)

if any(token in lowered for token in ['=', ':', '->', '"', "'"]):
score += 0.1

if any(fragment.isupper() and len(fragment) > 3 for fragment in lowered.split()):
score += 0.05

return max(0.0, min(1.0, score))

def get_confidence_level(self, score: float) -> str:
"""
Get human-readable confidence level.
Expand Down
35 changes: 35 additions & 0 deletions xposure/correlate/dedup.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,25 @@ def _create_finding(self, candidate: Candidate) -> Finding:
confidence=candidate.confidence,
confidence_factors=[],
entropy=candidate.entropy,
severity=candidate.severity,
metadata=candidate.metadata.copy() if candidate.metadata else {},
)

# Preserve rule metadata and remediation guidance
if candidate.rule_id or candidate.rule_name:
finding.metadata.setdefault("rule", {})
if candidate.rule_id:
finding.metadata["rule"]["id"] = candidate.rule_id
if candidate.rule_name:
finding.metadata["rule"]["name"] = candidate.rule_name

if candidate.remediation:
finding.remediation = candidate.remediation

if candidate.verifier:
finding.metadata.setdefault("verification", {})
finding.metadata["verification"].setdefault("suggested_verifier", candidate.verifier)

return finding

def _merge_candidate(self, finding: Finding, candidate: Candidate):
Expand All @@ -105,6 +122,24 @@ def _merge_candidate(self, finding: Finding, candidate: Candidate):
f"seen in {len(finding.sources)} sources"
)

# Merge severity if existing finding lacks it
if not finding.severity and candidate.severity:
finding.severity = candidate.severity

# Merge metadata (rule/provider info)
if candidate.metadata:
finding.metadata.update({k: v for k, v in candidate.metadata.items() if k not in finding.metadata})

if candidate.rule_id or candidate.rule_name:
finding.metadata.setdefault("rule", {})
if candidate.rule_id:
finding.metadata["rule"].setdefault("id", candidate.rule_id)
if candidate.rule_name:
finding.metadata["rule"].setdefault("name", candidate.rule_name)

if candidate.remediation and not finding.remediation:
finding.remediation = candidate.remediation

def _mask_value(self, value: str, visible: int = 8) -> str:
"""
Mask credential value for safe display.
Expand Down
13 changes: 11 additions & 2 deletions xposure/extract/quick.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ class QuickScanner:
'github_token': r'(?:ghp|gho|ghu|ghs|ghr)_[a-zA-Z0-9]{36,}',
'github_fine': r'github_pat_[a-zA-Z0-9]{22}_[a-zA-Z0-9]{59}',

# Cloud & providers
'gcp_api_key': r'AIza[0-9A-Za-z\-_]{35}',
'digitalocean_pat': r'dop_v1_[a-f0-9]{64}',
'cloudflare_token': r'(?:CFP|CFU|cfp|cfu)[a-zA-Z0-9_-]{30,}',
'supabase_service_key': r'sb[a-z]{2}_[a-zA-Z0-9]{40,}',

# Slack
'slack_token': r'xox[baprs]-[0-9]{10,13}-[0-9]{10,13}-[a-zA-Z0-9]{24,}',
'slack_webhook': r'https://hooks\.slack\.com/services/T[A-Z0-9]{8,}/B[A-Z0-9]{8,}/[a-zA-Z0-9]{24}',
Expand Down Expand Up @@ -119,7 +125,8 @@ def scan(self, content: str, source: Source) -> Generator[Candidate, None, None]
high_confidence_patterns = {
'github_token', 'github_fine', 'slack_token',
'stripe_key', 'openai_key', 'anthropic_key',
'aws_access_key'
'aws_access_key', 'gcp_api_key', 'digitalocean_pat',
'cloudflare_token', 'supabase_service_key'
}

if entropy < self.min_entropy and pattern_name not in high_confidence_patterns:
Expand Down Expand Up @@ -180,7 +187,9 @@ def _initial_confidence(self, pattern_name: str, entropy: float) -> float:
high_confidence = {
'github_token', 'github_fine', 'slack_token',
'stripe_key', 'openai_key', 'anthropic_key',
'aws_access_key', 'slack_webhook'
'aws_access_key', 'slack_webhook',
'gcp_api_key', 'digitalocean_pat', 'cloudflare_token',
'supabase_service_key'
}

medium_confidence = {
Expand Down
Loading