diff --git a/datagen/rule2code/cwe2code.py b/datagen/rule2code/cwe2code.py index 287a10f..bff493a 100644 --- a/datagen/rule2code/cwe2code.py +++ b/datagen/rule2code/cwe2code.py @@ -275,7 +275,7 @@ def datagen_for_one_cwe(cwe_id, markdown, depth, remote_api=False): def main( parallel=256, - output_path="outputs/rule2code/cwe2code-raw.jsonl", + output_path="outputs/rule2code/cwe2code.jsonl", depth=1, remote_api=False, ): diff --git a/datagen/rule2code/get_bandit_rules.py b/datagen/rule2code/get_bandit_rules.py index a1ec64d..e631e59 100644 --- a/datagen/rule2code/get_bandit_rules.py +++ b/datagen/rule2code/get_bandit_rules.py @@ -2,4 +2,189 @@ # # SPDX-License-Identifier: Apache-2.0 -# TODO(@zhewang2001): Please refactor the corresponding code snippets and then upload it. +""" +Scrape all flake8-bandit (Sxxx) rules from the Ruff docs. + +Output: bandit_rules.json ― list[{code,name,short_msg,url,full_text}] +""" + +from __future__ import annotations + +import json +import re +import time +from pathlib import Path +from typing import Any, Dict, Iterable, Optional +from urllib.parse import urljoin + +import fire +import requests +from bs4 import BeautifulSoup, Tag + +SITE = "https://docs.astral.sh" +RULES_DIR = f"{SITE}/ruff/rules/" # <-- NEW +LISTING = f"{RULES_DIR}#flake8-bandit-s" +HEADERS = {"User-Agent": "bandit-scraper/0.2 (+https://github.com/you)"} + +SECTION_HEADINGS = { + "what it does": "what_it_does", + "why is this bad?": "why_bad", + "example": "example_bad", + "use instead:": "example_good", +} + +TITLE_RE = re.compile(r"^(?P
S\d{3})\)$", re.I)
+
+BANDIT_RE = re.compile(r"\b[bB](\d{3})\b") # matches B605, b401, …
+
+
+def load_ruff_rules(path: str | Path = "bandit_rules.json") -> Dict[str, dict]:
+ """code → full rule dict (O(1) lookup)."""
+ rules = json.loads(Path(path).read_text())
+ return {r["code"]: r for r in rules} # e.g. "S605": {...}
+
+
+def bandit_id(text: str) -> Optional[str]:
+ """Return 'B605' (str) or None."""
+ m = BANDIT_RE.search(text)
+ return f"B{m.group(1)}" if m else None
+
+
+def ruff_code(bid: str) -> str:
+ """'B605' → 'S605' (flake8-bandit / Ruff code)."""
+ return "S" + bid[1:]
+
+
+def enrich(recs: Iterable[dict], rules: Dict[str, Any]) -> Iterable[dict]:
+ """Yield each rec + attached Ruff rule (or None)."""
+ for rec in recs:
+ bid = bandit_id(rec["recommendation_text"])
+ rc = ruff_code(bid) if bid else None
+ rec["bandit_id"] = bid
+ rec["ruff_code"] = rc
+ rec["ruff_rule"] = rules.get(rc)
+ yield rec
+
+
+def categorize_bandit_text(full_text: str) -> Dict[str, Optional[str]]:
+ raw_lines = full_text.splitlines()
+ lines = []
+
+ for line in raw_lines:
+ if line.strip():
+ lines.append(line.rstrip())
+ elif lines and lines[-1].strip():
+ lines.append("")
+
+ if not lines:
+ raise ValueError("empty text")
+
+ m = TITLE_RE.match(lines[0].strip())
+ if not m:
+ raise ValueError(f"unexpected title line {lines[0]!r}")
+
+ out = {
+ "code": m.group("code"),
+ "title": m.group("title"),
+ "what_it_does": None,
+ "why_bad": None,
+ "example_bad": None,
+ "example_good": None,
+ "remainder": None,
+ }
+
+ current_key = "remainder"
+ buf = []
+
+ def flush():
+ if buf:
+ text = "\n".join(buf).rstrip()
+ if current_key in ["example_bad", "example_good"]:
+ text = text.split("\nReferences")[0].rstrip()
+ text = text.split("\nNote")[0].rstrip()
+ text = text.split("\nOptions")[0].rstrip()
+ elif current_key in ["what_it_does", "why_bad"]:
+ text = " ".join(text.split())
+ if out[current_key]:
+ out[current_key] += "\n" + text
+ else:
+ out[current_key] = text
+ buf.clear()
+
+ for ln in lines[1:]:
+ key = SECTION_HEADINGS.get(ln.strip().lower())
+ if key:
+ flush()
+ current_key = key
+ continue
+ buf.append(ln)
+ flush()
+ return out
+
+
+def get_soup(url: str) -> BeautifulSoup:
+ r = requests.get(url, headers=HEADERS, timeout=30)
+ r.raise_for_status()
+ return BeautifulSoup(r.text, "html.parser")
+
+
+def bandit_table(doc: BeautifulSoup) -> Tag:
+ h2 = doc.find(id="flake8-bandit-s")
+ if not h2:
+ raise RuntimeError("unable to find flake8-bandit section")
+ return h2.find_next("table")
+
+
+def row_to_meta(tr: Tag) -> dict[str, str]:
+ tds = tr.find_all("td")
+ code = tds[0].text.strip()
+ a = tds[1].find("a")
+ rel = a["href"]
+ url = urljoin(RULES_DIR, rel.lstrip("/")) # <-- FIX
+ return {
+ "code": code,
+ "name": a.text.strip(),
+ "short_msg": tds[2].get_text(" ", strip=True),
+ "url": url,
+ }
+
+
+def page_markdown(url: str) -> str:
+ soup = get_soup(url)
+ body = soup.find("article") or soup
+ for n in body.select("nav, aside, footer"):
+ n.decompose()
+
+ placeholders = []
+ for pre in body.find_all("pre"):
+ placeholders.append(pre.get_text(separator="", strip=False))
+ pre.replace_with(f"__PRE_PLACEHOLDER_{len(placeholders)-1}__")
+
+ text = body.get_text("\n", strip=False)
+ text = re.sub(r"\n{3,}", "\n\n", text)
+
+ for i, content in enumerate(placeholders):
+ text = text.replace(f"__PRE_PLACEHOLDER_{i}__", content)
+
+ return text
+
+
+def main(output_file: str = "bandit_rules.json") -> None:
+ soup = get_soup(LISTING)
+ rows = bandit_table(soup).tbody.find_all("tr")
+ result = []
+ for tr in rows:
+ meta = row_to_meta(tr)
+ try:
+ meta["full_text"] = categorize_bandit_text(page_markdown(meta["url"]))
+ except requests.HTTPError as e:
+ print(f"[WARN] {meta['code']}: {e}")
+ continue
+ result.append(meta)
+ time.sleep(0.3)
+ Path(output_file).write_text(json.dumps(result, indent=2, ensure_ascii=False))
+ print(f"✓ scraped {len(result)} rules → {output_file}")
+
+
+if __name__ == "__main__":
+ fire.Fire(main)
diff --git a/datagen/rule2code/guru2code.py b/datagen/rule2code/guru2code.py
index a1ec64d..ac64a35 100644
--- a/datagen/rule2code/guru2code.py
+++ b/datagen/rule2code/guru2code.py
@@ -2,4 +2,252 @@
#
# SPDX-License-Identifier: Apache-2.0
-# TODO(@zhewang2001): Please refactor the corresponding code snippets and then upload it.
+import json
+import os
+import random
+from concurrent.futures import ThreadPoolExecutor, as_completed
+
+from datasets import load_dataset
+from dotenv import load_dotenv
+from litellm import batch_completion
+from openai import OpenAI
+from rich import print as rprint
+from tqdm import tqdm
+
+
+def generate_initial_prompt(seed_data):
+ name = seed_data["name"]
+ description = seed_data["description"]
+
+ example_section = ""
+ if not seed_data.get("code_snippets"):
+ if seed_data.get("noncompliant_example"):
+ example_section += f"""### Noncompliant Example
+```
+{seed_data["noncompliant_example"]}
+```
+
+"""
+ if seed_data.get("compliant_example"):
+ example_section += f"""### Compliant Example
+```
+{seed_data["compliant_example"]}
+```
+"""
+
+ snippets_section = ""
+ if code_snippets := seed_data.get("code_snippets", []):
+
+ selected_snippets = random.sample(code_snippets, min(len(code_snippets), 5))
+ snippets_section = (
+ "\n## Code Snippets that Contain the Vulnerability for Inspiration\n"
+ )
+ for i, snippet in enumerate(selected_snippets, 1):
+ snippets_section += f"""### Example {i}
+{snippet}
+
+"""
+
+ example_section = (
+ f"\n## Code Examples\n{example_section}" if example_section else ""
+ )
+
+ return f"""Given the following information about the "{name}", please demonstrate complete PYTHON code examples that demonstrate this vulnerability or violate this security pattern:
+
+--- Information for "{name}" ---
+## Description of "{name}"
+{description}
+{example_section}{snippets_section}--- End of Information ---
+Now, please create a code example that demonstrates this vulnerability or violates this security pattern.
+
+Notes:
+* Create a code example in PYTHON
+* The code example you generate **MUST** contain **this** vulnerability or violate this security pattern, and the vulnerability in the code example MUST actually be detected by CodeGuru.
+* Format: An example output is demonstrate below:
+
+--- BEGIN OF EXAMPLE ---
+## Code Example
+
+```python
+[your code here]
+```
+## Explanation
+
+[Explain why this code contains this vulnerability or violates this security pattern.]
+--- END OF EXAMPLE ---"""
+
+
+def generate_followup_prompt(seed_data):
+ name = seed_data["name"]
+ description = seed_data["description"]
+
+ example_section = ""
+ if not seed_data.get("code_snippets"):
+ if seed_data.get("noncompliant_example"):
+ example_section += f"""### Noncompliant Example
+```
+{seed_data["noncompliant_example"]}
+```
+
+"""
+ if seed_data.get("compliant_example"):
+ example_section += f"""### Compliant Example
+```
+{seed_data["compliant_example"]}
+```
+"""
+
+ snippets_section = ""
+ if code_snippets := seed_data.get("code_snippets", []):
+
+ selected_snippets = random.sample(code_snippets, min(len(code_snippets), 5))
+ snippets_section = (
+ "\n## Code Snippets that Contain the Vulnerability for Inspiration\n"
+ )
+ for i, snippet in enumerate(selected_snippets, 1):
+ snippets_section += f"""### Example {i}
+{snippet}
+
+"""
+
+ example_section = (
+ f"\n## Code Examples\n{example_section}" if example_section else ""
+ )
+
+ return f"""Now, please create a NEW code example that demonstrates this vulnerability or violates this security pattern.
+
+--- Information for "{name}" ---
+## Description of "{name}"
+{description}
+{example_section}{snippets_section}--- End of Information ---
+
+Notes:
+* The code example you generate **MUST** contain this vulnerability or violate this security pattern, and the vulnerability in the code example MUST actually be detected by CodeGuru.
+* Format: An example output is demonstrate below:
+
+--- BEGIN OF EXAMPLE ---
+## Code Example
+
+```python
+[your code here]
+```
+## Explanation
+
+[Explain why this code contains this vulnerability or violates this security pattern.]
+--- END OF EXAMPLE ---"""
+
+
+def _create_client(remote_api=False):
+ if remote_api:
+ load_dotenv()
+ return None, "bedrock/converse/us.deepseek.r1-v1:0"
+ return (
+ OpenAI(api_key="none", base_url="http://localhost:30000/v1"),
+ "default",
+ )
+
+
+def datagen_for_one_seed(
+ seed_data,
+ output_file,
+ finished_pairs,
+ depth=1,
+ remote_api=False,
+):
+ client, model = _create_client(remote_api=remote_api)
+ common_args = {
+ "model": model,
+ "temperature": 0.8,
+ }
+
+ if seed_data["name"] in finished_pairs:
+ return True
+
+ rprint(f"[bold yellow]Processing: Seed ID: {seed_data['name']}[/bold yellow]")
+
+ messages = [
+ {
+ "role": "user",
+ "content": generate_initial_prompt(seed_data),
+ }
+ ]
+
+ for i in range(depth):
+ if remote_api:
+ response = batch_completion(
+ model=model,
+ messages=[messages],
+ )[0]
+ else:
+ response = client.chat.completions.create(messages=messages, **common_args)
+
+ if response.choices[0].finish_reason == "length":
+ break
+
+ content = response.choices[0].message.content.split("")[-1].strip()
+ messages.append({"role": "assistant", "content": content})
+
+ if i < depth - 1:
+ messages.append(
+ {
+ "role": "user",
+ "content": generate_followup_prompt(seed_data),
+ }
+ )
+
+ if i == depth - 1 or response.choices[0].finish_reason == "length":
+ result = {
+ "id": seed_data["name"],
+ "conversation": messages,
+ }
+
+ with open(output_file, "a", encoding="utf-8") as f:
+ f.write(json.dumps(result, ensure_ascii=False) + "\n")
+ finished_pairs.add(seed_data["name"])
+ rprint(f"[bold green]Completed: Seed ID: {seed_data['name']}[/bold green]")
+
+ return True
+
+
+def main(
+ parallel=256,
+ output_path="outputs/rule2code/guru2code.jsonl",
+ depth=1,
+ remote_api=False,
+):
+ os.makedirs(os.path.dirname(output_path), exist_ok=True)
+
+ finished_pairs = set()
+ if os.path.exists(output_path):
+ with open(output_path, "r", encoding="utf-8") as f:
+ for line in f:
+ data = json.loads(line)
+ finished_pairs.add(data["id"])
+ print(f"Found {len(finished_pairs)} already processed seed_code_ids")
+
+ dataset = load_dataset("purpcode/codeguru-python-detectors", split="test")
+ seed_data_list = dataset.to_list()
+
+ with ThreadPoolExecutor(max_workers=parallel) as executor:
+ futures = []
+ for seed_data in seed_data_list:
+ if seed_data["name"] not in finished_pairs:
+ futures.append(
+ executor.submit(
+ datagen_for_one_seed,
+ seed_data,
+ output_path,
+ finished_pairs,
+ depth,
+ remote_api,
+ )
+ )
+
+ for future in tqdm(as_completed(futures), total=len(futures)):
+ future.result()
+
+
+if __name__ == "__main__":
+ import fire
+
+ fire.Fire(main)
diff --git a/datagen/rule2code/post_process.py b/datagen/rule2code/post_process.py
index a1ec64d..6d06dcf 100644
--- a/datagen/rule2code/post_process.py
+++ b/datagen/rule2code/post_process.py
@@ -2,4 +2,282 @@
#
# SPDX-License-Identifier: Apache-2.0
-# TODO(@zhewang2001): Please refactor the corresponding code snippets and then upload it.
+import hashlib
+import json
+import re
+from pathlib import Path
+from typing import Dict, Optional
+
+import fire
+
+from eval.oracles.secure_code_oracles import evaluate_secure_code_gen
+
+R_BANDIT_URL = re.compile(r"https?://[^ \t\n\r]*bandit\.readthedocs\.io[^\s]*", re.I)
+R_BID = re.compile(r"[bB](\d{3})")
+
+
+def _clean_code_snippet(raw: Optional[str]) -> Optional[str]:
+ if not raw:
+ return None
+
+ snippet = raw.split("References", 1)[0].strip()
+ snippet = re.sub(r"\s+", " ", snippet)
+
+ for pat, repl in [
+ (r"\s*\.\s*", "."),
+ (r"\s*,\s*", ", "),
+ (r"\s*\(\s*", "("),
+ (r"\s*\)\s*", ")"),
+ (r"\[\s*", "["),
+ (r"\s*\]", "]"),
+ ]:
+ snippet = re.sub(pat, repl, snippet)
+
+ m = re.match(r"^(import\s+\w+)\s+(.*)", snippet)
+ if m:
+ snippet = f"{m.group(1)}\n{m.group(2).lstrip()}"
+
+ return snippet.strip()
+
+
+def load_ruff_rules(path: str | Path) -> Dict[str, dict]:
+ """code → full rule dict (O(1) lookup)."""
+ rules = json.loads(Path(path).read_text())
+ return {r["code"]: r for r in rules}
+
+
+def bandit_url(text: str) -> Optional[str]:
+ if not text:
+ return None
+ m = R_BANDIT_URL.search(text)
+ return m.group(0) if m else None
+
+
+def bid_from_url(url: str) -> Optional[str]:
+ """Extract Bxxx from a Bandit URL."""
+ if not url:
+ return None
+ m = R_BID.search(url)
+ return f"B{m.group(1)}" if m else None
+
+
+def bandit_id(text: str) -> Optional[str]:
+ url = bandit_url(text)
+ return bid_from_url(url) if url else None
+
+
+def ruff_code(bid: str) -> str:
+ return "S" + bid[1:]
+
+
+def extract_code_examples(input_path: str, output_path: str) -> None:
+ with open(input_path, "r") as f, open(output_path, "w") as out:
+ for line in f:
+ data = json.loads(line)
+ vuln_id = data["id"]
+ for message in data["conversation"]:
+ if message["role"] == "assistant":
+ content = message["content"]
+ pattern = r"---\s*BEGIN OF EXAMPLE\s*---\s*##\s*Code Example\s*```python\s*(.*?)\s*```\s*##\s*Explanation.*?---\s*END OF EXAMPLE\s*---"
+ matches = re.findall(pattern, content, re.DOTALL)
+
+ if matches:
+ code = matches[0].strip()
+ code_with_markers = f"```python\n{code}\n```"
+ seed_code_id = hashlib.sha256(code.encode()).hexdigest()
+
+ output = {
+ "task_id": seed_code_id,
+ "id": vuln_id,
+ "messages": [
+ {"role": "assistant", "content": code_with_markers}
+ ],
+ }
+
+ out.write(json.dumps(output) + "\n")
+
+
+def reformat_results(
+ analyzer_results_path: str,
+ input_path: str,
+ output_path: str,
+ ruff_rules_path: str,
+ source: str,
+) -> str:
+ ruff_rules = load_ruff_rules(ruff_rules_path)
+ rule_keys: set[str] = {k for r in ruff_rules.values() for k in r.keys()}
+
+ def extract_code_content(content):
+ if not content.startswith("```") or not content.endswith("```"):
+ return content
+ lines = content.split("\n")
+ return "\n".join(lines[1:-1])
+
+ results = []
+
+ if not Path(analyzer_results_path).exists():
+ raise FileNotFoundError(
+ f"Analyzer results file not found: {analyzer_results_path}. "
+ )
+
+ with open(analyzer_results_path, "r") as f:
+ analyzer_data = json.load(f)
+
+ with open(input_path, "r") as f:
+ for line in f:
+ data = json.loads(line)
+ task_id = data["task_id"]
+
+ if task_id not in analyzer_data:
+ continue
+
+ result = {
+ "filename": f"{task_id}_{source}",
+ "id": task_id,
+ "parent_content": extract_code_content(data["messages"][0]["content"]),
+ "patch": None,
+ "source": source,
+ "translated": False,
+ "detector_name": None,
+ "analyzer_results": [],
+ }
+
+ all_cwes = set()
+ codeguru_cwes = set()
+ codeql_cwes = set()
+ detector_names = set()
+
+ for finding in analyzer_data[task_id]:
+ detector_names.add(finding["misc"]["finding"]["detector_name"])
+
+ cwes_raw = finding.get("cwes", [])
+ if isinstance(cwes_raw, list):
+ parsed_cwes = set()
+ for cwe_str in cwes_raw:
+ if isinstance(cwe_str, str) and cwe_str.upper().startswith(
+ "CWE-"
+ ):
+ try:
+ cwe_num = int(cwe_str[4:])
+ parsed_cwes.add(cwe_num)
+ except (ValueError, TypeError):
+ pass
+
+ all_cwes.update(parsed_cwes)
+ analyzer_type = finding.get("analyzer")
+ if analyzer_type == "codeguru":
+ codeguru_cwes.update(parsed_cwes)
+ elif analyzer_type == "codeql":
+ codeql_cwes.update(parsed_cwes)
+
+ code_snippets = finding["misc"]["finding"]["code_snippet"]
+ start_line = finding["misc"]["finding"]["start_line"]
+ end_line = finding["misc"]["finding"]["end_line"]
+
+ vuln_lines = []
+ for snippet in code_snippets:
+ if start_line <= snippet["line"] <= end_line:
+ vuln_lines.append(snippet["content"])
+
+ if not vuln_lines:
+ vuln_code = code_snippets[-1]["content"] if code_snippets else ""
+ else:
+ vuln_code = "\n".join(vuln_lines)
+
+ analyzer_result = {
+ "raw_codeguru_detection": {
+ "analyzer": finding["analyzer"],
+ "raw_codeguru_result": finding["misc"]["finding"],
+ },
+ "summary": {
+ "cwe": None,
+ "associated_cwe": [],
+ "start_line_no": None,
+ "end_line_no": None,
+ "title": None,
+ "recommendation_text": None,
+ "name": finding["misc"]["finding"]["detector_name"],
+ "severity": finding["severity"],
+ "description": finding["misc"]["finding"]["description"],
+ "bandit_id": None,
+ "ruff_code": None,
+ "examples": [],
+ },
+ "codeguru_website_info": {
+ "name": finding["misc"]["finding"]["detector_name"],
+ "severity": finding["severity"],
+ "detector_id": finding["misc"]["finding"]["rule_id"],
+ "category": "security",
+ "cwe": finding["cwes"],
+ "tags": finding["misc"]["finding"]["detector_tags"],
+ "description": finding["misc"]["finding"]["description"],
+ "noncompliant_example": None,
+ "compliant_example": None,
+ "url": finding["misc"]["finding"]["recommendation_url"],
+ },
+ "ruff_website_info": {},
+ "vuln_code_line": vuln_code,
+ }
+
+ recommendation_text = finding["misc"]["finding"].get(
+ "recommendation_text"
+ )
+ bid = bandit_id(recommendation_text)
+ rc = ruff_code(bid) if bid else None
+ rule = ruff_rules.get(rc, {})
+
+ analyzer_result["summary"]["bandit_id"] = bid
+ analyzer_result["summary"]["ruff_code"] = rc
+
+ final_rule_keys = {}
+ for k in rule_keys:
+ if k in ["example_good", "example_bad"]:
+ a = _clean_code_snippet(rule.get(k))
+ else:
+ a = rule.get(k)
+ final_rule_keys[k] = a
+ analyzer_result["ruff_website_info"] = final_rule_keys
+
+ result["analyzer_results"].append(analyzer_result)
+
+ result["cwe_coverage"] = {
+ "all": sorted(list(all_cwes)),
+ "codeguru": sorted(list(codeguru_cwes)),
+ "codeql": sorted(list(codeql_cwes)),
+ }
+ result["detectors"] = sorted(list(detector_names))
+ results.append(result)
+
+ with open(output_path, "w") as f:
+ for result in results:
+ f.write(json.dumps(result) + "\n")
+
+ return f"Results written to {output_path}"
+
+
+def main(
+ input_path="outputs/rule2code/cwe2code.jsonl",
+ ruff_rules_path="bandit_rules.json",
+ source="cwe2code",
+) -> None:
+ output_path = input_path.replace(".jsonl", ".processed.jsonl")
+
+ extract_code_examples(input_path, output_path)
+
+ evaluate_secure_code_gen(output_path)
+
+ analyzer_results_path = (
+ Path(output_path).parent
+ / f"{Path(output_path).stem}_analyzer_results/static_analyzer_results.json"
+ )
+ reformat_results(
+ analyzer_results_path=str(analyzer_results_path),
+ input_path=output_path,
+ output_path=output_path,
+ ruff_rules_path=ruff_rules_path,
+ source=source,
+ )
+
+
+if __name__ == "__main__":
+ fire.Fire(main)