Skip to content

Latest commit

 

History

History
778 lines (640 loc) · 33 KB

File metadata and controls

778 lines (640 loc) · 33 KB

AppMapper Design Document

What AppMapper Does

AppMapper is a semantic code understanding layer that enables natural language queries about codebases. Unlike traditional static analysis that matches patterns, AppMapper understands what code does and can answer questions about functionality, relationships, and security properties.


Core Capabilities

1. Semantic Code Indexing

AppMapper parses code into semantic units and enriches them with:

┌─────────────────────────────────────────────────────────────────┐
│                        Code Unit                                 │
├─────────────────────────────────────────────────────────────────┤
│  Identity        │ path, name, type, line numbers               │
│  Content         │ code, signature, docstring                   │
│  Context         │ parent class, imports, language              │
│  Relationships   │ calls, called_by                             │
├─────────────────────────────────────────────────────────────────┤
│  ENRICHMENTS (AppMapper adds):                                  │
├─────────────────────────────────────────────────────────────────┤
│  Description     │ "Handles user login via JWT authentication"  │
│  Functionality   │ [authentication, session_management]         │
│  Security Tags   │ [password_handling, token_generation]        │
│  Data Accessed   │ [users, sessions]                            │
│  Resources       │ [database, external_auth]                    │
└─────────────────────────────────────────────────────────────────┘

2. Natural Language Queries

# Traditional SAST: Pattern matching
grep -r "eval\(" src/  # Finds pattern, no context

# AppMapper: Semantic understanding
appmapper.query("Where is user input passed to eval?")
# Returns: src/utils/template.ts:45 - renderDynamic()
#          "Passes user-supplied template to eval for rendering"

3. Relationship Discovery

# Find similar code
appmapper.query("Find functions similar to getUserById")

# Find data flow
appmapper.query("What functions access user passwords?")

# Find patterns
appmapper.query("Which endpoints don't have authentication?")

4. Consistency Checking

# Compare authorization across endpoints
appmapper.query(
    "Compare GET /orders/:id and DELETE /orders/:id - "
    "do they both verify ownership?"
)

AppMapper Architecture

┌─────────────────────────────────────────────────────────────────────────┐
│                            AppMapper                                     │
│                                                                          │
│  ┌────────────┐   ┌────────────┐   ┌────────────┐   ┌────────────┐     │
│  │   Parser   │──▶│  Enricher  │──▶│  Indexer   │──▶│   Query    │     │
│  │            │   │            │   │            │   │   Engine   │     │
│  └────────────┘   └────────────┘   └────────────┘   └────────────┘     │
│        │               │               │                  │             │
│        ▼               ▼               ▼                  ▼             │
│   Tree-sitter     LLM + Rules      ChromaDB         Semantic Search    │
│   AST parsing     descriptions     embeddings       + LLM reasoning    │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Input:  Source code directory
Output: Answers to natural language questions about the code

What Questions Can AppMapper Answer?

Category Example Questions
Functionality "Where is user authentication handled?"
Data Flow "What functions process credit card data?"
Security "Which endpoints accept file uploads?"
Authorization "Does this endpoint verify user ownership?"
Comparison "Do similar endpoints have the same auth checks?"
Missing Checks "Which data endpoints lack authorization?"
Relationships "What calls the payment processing function?"
State "What are valid order status transitions?"

CVE-GEN Current Pipeline

┌─────────────────────────────────────────────────────────────────────────┐
│                         CVE-GEN Pipeline                                 │
│                                                                          │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐             │
│  │  Stage 1 │──▶│  Stage 2 │──▶│  Stage 3 │──▶│  Stage 4 │             │
│  │  Parse   │   │  Enrich  │   │  Index   │   │  SAST    │             │
│  └──────────┘   └──────────┘   └──────────┘   └──────────┘             │
│                                                     │                    │
│                                                     ▼                    │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐             │
│  │  Stage 8 │◀──│  Stage 7 │◀──│  Stage 6 │◀──│  Stage 5 │             │
│  │  Report  │   │  Verify  │   │  Chains  │   │  VulnRAG │             │
│  └──────────┘   └──────────┘   └──────────┘   └──────────┘             │
│                                                                          │
│  Current Outputs:                                                        │
│  ✓ SQL Injection, XSS, Command Injection                                │
│  ✓ Path Traversal, SSRF, Deserialization                                │
│  ✓ Known CVE pattern matches                                            │
│  ✗ IDOR (can't verify ownership checks)                                 │
│  ✗ Business Logic (no semantic understanding)                           │
│  ✗ Auth Bypass (can't find MISSING checks)                              │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

AppMapper Integration Points

┌─────────────────────────────────────────────────────────────────────────┐
│                    CVE-GEN + AppMapper Pipeline                          │
│                                                                          │
│  ┌──────────┐   ┌──────────┐   ┌──────────────────────┐   ┌──────────┐ │
│  │  Stage 1 │──▶│  Stage 2 │──▶│      Stage 3         │──▶│  Stage 4 │ │
│  │  Parse   │   │  Enrich  │   │  Index + AppMapper   │   │  SAST    │ │
│  └──────────┘   └──────────┘   │    Semantic Index    │   └──────────┘ │
│                                └──────────────────────┘        │        │
│                                          │                     │        │
│                    ┌─────────────────────┴─────────────────────┘        │
│                    │                                                     │
│                    ▼                                                     │
│  ┌─────────────────────────────────────────────────────────────────┐   │
│  │                 NEW: AppMapper Analysis Stage                    │   │
│  │                                                                  │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │   │
│  │  │    IDOR     │  │    Auth     │  │  Business   │              │   │
│  │  │  Detector   │  │ Consistency │  │   Logic     │              │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘              │   │
│  │                                                                  │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐              │   │
│  │  │   State     │  │  Semantic   │  │  Missing    │              │   │
│  │  │  Machine    │  │   STRIDE    │  │   Check     │              │   │
│  │  │  Validator  │  │ Enhancement │  │  Detector   │              │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘              │   │
│  │                                                                  │   │
│  └─────────────────────────────────────────────────────────────────┘   │
│                    │                                                     │
│                    ▼                                                     │
│  ┌──────────┐   ┌──────────┐   ┌──────────┐   ┌──────────┐             │
│  │  Stage 8 │◀──│  Stage 7 │◀──│  Stage 6 │◀──│  Stage 5 │             │
│  │  Report  │   │  Verify  │   │  Chains  │   │  VulnRAG │             │
│  └──────────┘   └──────────┘   └──────────┘   └──────────┘             │
│                                                                          │
│  NEW Outputs:                                                            │
│  ✓ IDOR vulnerabilities                                                 │
│  ✓ Authorization inconsistencies                                        │
│  ✓ Business logic flaws                                                 │
│  ✓ State machine bypasses                                               │
│  ✓ Missing security checks                                              │
│                                                                          │
└─────────────────────────────────────────────────────────────────────────┘

Integration Point Details

Integration Point 1: Enhanced Indexing (Stage 3)

Location: src/indexer/vectordb.py

Current:

class VectorIndexer:
    def index(self, units: list[EnrichedCodeUnit], project: str):
        # Stores code units with basic metadata
        for unit in units:
            self.collection.add(
                documents=[unit.description],
                metadatas=[unit.to_metadata()],
                ids=[unit.id]
            )

With AppMapper:

class VectorIndexer:
    def index(self, units: list[EnrichedCodeUnit], project: str):
        for unit in units:
            # NEW: Add semantic enrichments
            unit.semantic_purpose = self._get_purpose(unit)
            unit.data_accessed = self._get_data_access(unit)
            unit.auth_requirements = self._get_auth_info(unit)

            self.collection.add(
                documents=[unit.description],
                metadatas=[unit.to_enhanced_metadata()],
                ids=[unit.id]
            )

Integration Point 2: Attack Surface Analysis (Threat Model)

Location: src/threatmodel/attack_surface.py

Current:

class AttackSurfaceAnalyzer:
    def _find_entry_points(self, project_name: str) -> list[EntryPoint]:
        # Searches for routes by keywords
        results = self.indexer.search(
            query="API endpoint route handler HTTP request",
            n_results=500,
        )
        # Returns list of entry points with basic info

With AppMapper:

class AttackSurfaceAnalyzer:
    def __init__(self, indexer, appmapper=None):
        self.indexer = indexer
        self.appmapper = appmapper  # NEW

    def _find_entry_points(self, project_name: str) -> list[EntryPoint]:
        results = self.indexer.search(...)

        if self.appmapper:
            for ep in results:
                # NEW: Semantic enhancement
                ep.semantic_info = await self.appmapper.analyze_endpoint(ep)

        return results

    # NEW METHOD
    async def analyze_endpoint(self, ep: EntryPoint) -> EndpointAnalysis:
        return {
            "purpose": await self.query(f"What does {ep.name} do?"),
            "data_accessed": await self.query(f"What data does {ep.name} access?"),
            "has_auth": await self.query(f"Does {ep.name} require authentication?"),
            "has_ownership_check": await self.query(f"Does {ep.name} verify resource ownership?"),
            "similar_endpoints": await self.query(f"Find similar endpoints to {ep.name}"),
        }

Integration Point 3: STRIDE Analysis Enhancement

Location: src/threatmodel/stride.py

Current:

class StrideAnalyzer:
    # Maps security tags to threats using templates
    TAG_TO_STRIDE = {
        "authentication": [STRIDE_CATEGORY.SPOOFING],
        "authorization": [STRIDE_CATEGORY.ELEVATION_OF_PRIVILEGE],
        ...
    }

    def _analyze_entry_point(self, ep: EntryPoint) -> list[Threat]:
        for tag in ep.security_tags:
            if tag in self.TAG_TO_STRIDE:
                # Creates threat from template

With AppMapper:

class StrideAnalyzer:
    def __init__(self, appmapper=None):
        self.appmapper = appmapper

    async def _analyze_entry_point(self, ep: EntryPoint) -> list[Threat]:
        threats = []

        # Existing template-based threats
        threats.extend(self._template_threats(ep))

        if self.appmapper:
            # NEW: Semantic threat detection

            # Check for ACTUAL auth (not just tag)
            auth_check = await self.appmapper.query(
                f"Does {ep.name} actually verify authentication? Show evidence."
            )
            if not auth_check.confirmed:
                threats.append(Threat(
                    category=STRIDE_CATEGORY.SPOOFING,
                    title=f"Missing Authentication in {ep.name}",
                    evidence=auth_check.evidence
                ))

            # Check for ownership verification (IDOR)
            ownership_check = await self.appmapper.query(
                f"Does {ep.name} verify the user owns the resource?"
            )
            if not ownership_check.confirmed and ep.accesses_user_data:
                threats.append(Threat(
                    category=STRIDE_CATEGORY.ELEVATION_OF_PRIVILEGE,
                    title=f"Potential IDOR in {ep.name}",
                    evidence=ownership_check.evidence
                ))

        return threats

Integration Point 4: New AppMapper Analysis Stage

Location: src/pipeline/stage_appmapper.py (NEW FILE)

class AppMapperAnalysisStage:
    """
    New pipeline stage that performs semantic security analysis.
    Runs after SAST, before VulnRAG matching.
    """

    def __init__(self, appmapper: AppMapperService):
        self.appmapper = appmapper

        # Initialize detectors
        self.idor_detector = IDORDetector(appmapper)
        self.auth_checker = AuthConsistencyChecker(appmapper)
        self.business_logic = BusinessLogicAnalyzer(appmapper)
        self.state_machine = StateMachineValidator(appmapper)
        self.missing_check = MissingCheckDetector(appmapper)

    async def process(self, context: ScanContext) -> ScanContext:
        """Run all semantic analyses."""

        # 1. IDOR Detection
        context.findings.extend(
            await self.idor_detector.scan(context.project)
        )

        # 2. Authorization Consistency
        context.findings.extend(
            await self.auth_checker.check(context.project)
        )

        # 3. Business Logic Vulnerabilities
        context.findings.extend(
            await self.business_logic.analyze(context.project)
        )

        # 4. State Machine Validation
        context.findings.extend(
            await self.state_machine.validate(context.project)
        )

        # 5. Missing Security Checks
        context.findings.extend(
            await self.missing_check.detect(context.project)
        )

        return context

AppMapper Services (New Components)

Service 1: IDOR Detector

File: src/appmapper/idor_detector.py

class IDORDetector:
    """
    Detects Insecure Direct Object Reference vulnerabilities
    by analyzing authorization patterns across endpoints.
    """

    async def scan(self, project: str) -> list[Finding]:
        findings = []

        # Step 1: Find all "get by ID" endpoints
        id_endpoints = await self.appmapper.query(
            "Find all endpoints that retrieve resources by ID parameter"
        )

        # Step 2: Group by resource type
        by_resource = self._group_by_resource(id_endpoints)
        # Result: {"orders": [get_order, delete_order], "users": [...]}

        # Step 3: Check authorization for each
        for resource, endpoints in by_resource.items():
            auth_status = []

            for ep in endpoints:
                has_auth = await self.appmapper.query(
                    f"Does {ep.name} verify the user owns the {resource}?"
                )
                auth_status.append((ep, has_auth.confirmed))

            # Step 4: Find inconsistencies
            has_check = [ep for ep, auth in auth_status if auth]
            no_check = [ep for ep, auth in auth_status if not auth]

            if has_check and no_check:
                # Some endpoints check, some don't = vulnerability
                for ep in no_check:
                    findings.append(Finding(
                        type="IDOR",
                        severity="HIGH",
                        endpoint=ep,
                        title=f"IDOR in {ep.name}",
                        description=f"{ep.name} accesses {resource} without ownership check. "
                                   f"Similar endpoints {[e.name for e in has_check]} do check.",
                        recommendation="Add ownership verification"
                    ))

        return findings

Detection Flow:

┌─────────────────────────────────────────────────────────────────┐
│                    IDOR Detection Flow                           │
│                                                                  │
│  Find ID-based          Group by              Check each         │
│  endpoints              resource              for auth           │
│       │                    │                     │               │
│       ▼                    ▼                     ▼               │
│  ┌─────────┐          ┌─────────┐          ┌─────────┐          │
│  │GET /x/:id│         │ orders: │          │ GET: NO │          │
│  │PUT /x/:id│   ──▶   │  - GET  │    ──▶   │ PUT: YES│   ──▶    │
│  │DELETE /x │         │  - PUT  │          │ DEL: YES│          │
│  └─────────┘          │  - DEL  │          └─────────┘          │
│                       └─────────┘                │               │
│                                                  ▼               │
│                                          ┌─────────────┐        │
│                                          │ IDOR Finding│        │
│                                          │ GET lacks   │        │
│                                          │ auth check  │        │
│                                          └─────────────┘        │
└─────────────────────────────────────────────────────────────────┘

Service 2: Authorization Consistency Checker

File: src/appmapper/auth_consistency.py

class AuthConsistencyChecker:
    """
    Finds authorization inconsistencies by comparing
    similar endpoints for auth pattern differences.
    """

    async def check(self, project: str) -> list[Finding]:
        findings = []

        # Step 1: Find all auth patterns used
        auth_patterns = await self.appmapper.query(
            "Find all authentication middleware and authorization decorators"
        )

        # Step 2: Find protected endpoints
        protected = await self.appmapper.query(
            "Find endpoints that use authentication"
        )

        # Step 3: Find sensitive endpoints
        sensitive = await self.appmapper.query(
            "Find endpoints that access user data, modify records, "
            "or perform sensitive operations"
        )

        # Step 4: Find unprotected sensitive endpoints
        protected_paths = {ep.path for ep in protected}

        for ep in sensitive:
            if ep.path not in protected_paths:
                # Verify with specific query
                check = await self.appmapper.query(
                    f"Does {ep.name} require authentication?"
                )

                if not check.confirmed:
                    findings.append(Finding(
                        type="AUTH_MISSING",
                        severity="HIGH",
                        endpoint=ep,
                        title=f"Sensitive endpoint without auth: {ep.name}",
                        description=f"{ep.name} performs sensitive operations but has no auth"
                    ))

        return findings

Service 3: Business Logic Analyzer

File: src/appmapper/business_logic.py

class BusinessLogicAnalyzer:
    """
    Detects business logic vulnerabilities using semantic queries.
    """

    # Detection queries for each vulnerability type
    QUERIES = {
        "validation_timing": {
            "questions": [
                "Can data be modified between validation and execution?",
                "Is there a gap between price calculation and payment?",
            ],
            "indicators": ["cart", "checkout", "payment", "validate"]
        },
        "race_condition": {
            "questions": [
                "Are limited resources (discounts, inventory) handled atomically?",
                "Can concurrent requests cause double-spending?",
            ],
            "indicators": ["discount", "coupon", "inventory", "balance"]
        },
        "state_bypass": {
            "questions": [
                "Can order status be set directly without validation?",
                "Are state transitions enforced?",
            ],
            "indicators": ["status", "state", "workflow", "transition"]
        }
    }

    async def analyze(self, project: str) -> list[Finding]:
        findings = []

        for vuln_type, config in self.QUERIES.items():
            # Check if codebase has relevant functionality
            has_functionality = await self.appmapper.query(
                f"Does this codebase have {config['indicators']} functionality?"
            )

            if not has_functionality.confirmed:
                continue

            # Ask detection questions
            for question in config["questions"]:
                result = await self.appmapper.query(question)

                if result.indicates_vulnerability:
                    findings.append(Finding(
                        type=f"BUSINESS_LOGIC_{vuln_type.upper()}",
                        severity="MEDIUM",
                        title=f"Potential {vuln_type} vulnerability",
                        description=result.explanation,
                        evidence=result.code_references
                    ))

        return findings

Service 4: Missing Check Detector

File: src/appmapper/missing_check.py

class MissingCheckDetector:
    """
    Detects missing security checks by comparing similar code.
    This is AppMapper's unique capability - finding what's NOT there.
    """

    async def detect(self, project: str) -> list[Finding]:
        findings = []

        # Strategy: Find groups of similar functions, check for consistency

        # 1. Find all data modification functions
        modifiers = await self.appmapper.query(
            "Find all functions that modify user data, orders, or records"
        )

        # 2. Group by operation type
        groups = self._group_similar(modifiers)

        # 3. For each group, check what security patterns exist
        for group_name, functions in groups.items():
            security_checks = {}

            for func in functions:
                checks = await self.appmapper.query(
                    f"What security checks does {func.name} perform? "
                    f"Look for: auth, ownership, validation, rate limiting"
                )
                security_checks[func] = checks.found_checks

            # 4. Find functions missing checks that siblings have
            all_checks = set()
            for checks in security_checks.values():
                all_checks.update(checks)

            for func, checks in security_checks.items():
                missing = all_checks - set(checks)
                if missing:
                    findings.append(Finding(
                        type="MISSING_CHECK",
                        severity="MEDIUM",
                        function=func,
                        title=f"Missing security checks in {func.name}",
                        description=f"Similar functions have {missing} but this doesn't",
                        recommendation=f"Add {missing} checks for consistency"
                    ))

        return findings

File Structure After Integration

src/
├── parser/                      # Existing
│   ├── __init__.py
│   ├── base.py
│   ├── tree_sitter_parser.py
│   └── patterns.py
│
├── enricher/                    # Existing
│   ├── __init__.py
│   ├── llm.py
│   └── rules.py
│
├── indexer/                     # Existing (Enhanced)
│   ├── __init__.py
│   ├── vectordb.py              # Enhanced with semantic fields
│   └── raw_text.py
│
├── query/                       # Existing
│   ├── __init__.py
│   └── engine.py
│
├── threatmodel/                 # Existing (Enhanced)
│   ├── __init__.py
│   ├── attack_surface.py        # Enhanced with AppMapper
│   ├── stride.py                # Enhanced with AppMapper
│   ├── dfd.py
│   └── ...
│
├── vulnrag/                     # Existing
│   ├── __init__.py
│   ├── vuln_indexer.py
│   └── vuln_query.py
│
├── appmapper/                   # NEW - AppMapper Services
│   ├── __init__.py
│   ├── service.py               # Core AppMapper query interface
│   ├── idor_detector.py         # IDOR vulnerability detection
│   ├── auth_consistency.py      # Authorization consistency checking
│   ├── business_logic.py        # Business logic flaw detection
│   ├── state_machine.py         # State machine validation
│   ├── missing_check.py         # Missing security check detection
│   └── semantic_stride.py       # Enhanced STRIDE with semantics
│
├── pipeline/                    # Existing (Enhanced)
│   ├── __init__.py
│   ├── stage_parse.py
│   ├── stage_enrich.py
│   ├── stage_sast.py
│   ├── stage_appmapper.py       # NEW - AppMapper analysis stage
│   ├── stage_vulnrag.py
│   ├── stage_chains.py
│   └── stage_report.py
│
└── web/                         # Existing (Enhanced)
    ├── __init__.py
    ├── app.py                   # Add AppMapper API endpoints
    └── templates/

API Endpoints (New)

# Add to src/web/app.py

# IDOR Analysis
@app.route("/api/appmapper/<project>/idor")
def get_idor_analysis(project):
    detector = IDORDetector(appmapper)
    findings = detector.scan(project)
    return jsonify(findings)

# Authorization Consistency
@app.route("/api/appmapper/<project>/auth-consistency")
def get_auth_consistency(project):
    checker = AuthConsistencyChecker(appmapper)
    findings = checker.check(project)
    return jsonify(findings)

# Business Logic
@app.route("/api/appmapper/<project>/business-logic")
def get_business_logic(project):
    analyzer = BusinessLogicAnalyzer(appmapper)
    findings = analyzer.analyze(project)
    return jsonify(findings)

# Semantic Query (generic)
@app.route("/api/appmapper/<project>/query", methods=["POST"])
def semantic_query(project):
    question = request.json.get("question")
    result = appmapper.query(question, project)
    return jsonify(result)

# Full AppMapper Scan
@app.route("/api/appmapper/<project>/scan")
def full_appmapper_scan(project):
    """Run all AppMapper analyses."""
    results = {
        "idor": IDORDetector(appmapper).scan(project),
        "auth": AuthConsistencyChecker(appmapper).check(project),
        "business_logic": BusinessLogicAnalyzer(appmapper).analyze(project),
        "missing_checks": MissingCheckDetector(appmapper).detect(project),
    }
    return jsonify(results)

Summary: What AppMapper Adds to CVE-GEN

CVE-GEN Alone + AppMapper
Finds SQL injection patterns Finds IDOR by comparing auth across endpoints
Tags code with "authorization" Verifies authorization actually exists
Finds routes with security tags Finds routes MISSING security checks
Pattern-based business logic Semantic business logic analysis
Template-based STRIDE Evidence-based STRIDE with semantic verification

Key Differentiator: AppMapper can answer "Does this endpoint verify ownership?" - a question no pattern matcher can answer.


Implementation Phases

Phase Duration Deliverables
1. Core 2 weeks AppMapperService, IDORDetector
2. Auth 2 weeks AuthConsistencyChecker, Enhanced STRIDE
3. Business Logic 2 weeks BusinessLogicAnalyzer, StateMachineValidator
4. Integration 2 weeks Pipeline stage, API endpoints, UI