forked from Magniquick/KeyCrawler
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkeyboxer.py
More file actions
executable file
·106 lines (90 loc) · 3.77 KB
/
keyboxer.py
File metadata and controls
executable file
·106 lines (90 loc) · 3.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
import requests
import hashlib
import os
from lxml import etree
from pathlib import Path
from dotenv import load_dotenv
from check import keybox_check as CheckValid
session = requests.Session()
# Load environment variables from .env file
load_dotenv()
GITHUB_TOKEN = os.getenv("GITHUB_TOKEN")
if not GITHUB_TOKEN:
raise ValueError("GITHUB_TOKEN is not set in the .env file")
# Search query
search_query = "<AndroidAttestation>"
search_url = f"https://api.github.com/search/code?q={search_query}"
# Headers for the API request
headers = {
"Authorization": f"token {GITHUB_TOKEN}",
"Accept": "application/vnd.github.v3+json",
}
save = Path(__file__).resolve().parent / "keys"
cache_file = Path(__file__).resolve().parent / "cache.txt"
cached_urls = set(open(cache_file, "r").readlines())
# Function to fetch and print search results
def fetch_and_process_results(page):
params = {"per_page": 100, "page": page}
response = session.get(search_url, headers=headers, params=params)
if response.status_code != 200:
raise RuntimeError(f"Failed to retrieve search results: {response.status_code}")
search_results = response.json()
if "items" in search_results:
for item in search_results["items"]:
file_name = item["name"]
# Process only XML files
if file_name.lower().endswith(".xml"):
raw_url: str = (
item["html_url"].replace("github.com", "raw.githubusercontent.com").replace("/blob/", "/")
)
# check if the file exists in cache
if raw_url + "\n" in cached_urls:
continue
else:
cached_urls.add(raw_url + "\n")
# Fetch the file content
file_content = fetch_file_content(raw_url)
# Parse the XML
try:
root = etree.fromstring(file_content)
except etree.XMLSyntaxError:
continue
# Get the canonical form (C14N)
canonical_xml = etree.tostring(root, method="c14n")
# Hash the canonical XML
hash_value = hashlib.sha256(canonical_xml).hexdigest()
file_name_save = save / (hash_value + ".xml")
if not file_name_save.exists() and file_content and CheckValid(file_content):
print(f"{raw_url} is new")
with open(file_name_save, "wb") as f:
f.write(file_content)
return len(search_results["items"]) > 0 # Return True if there could be more results
# Function to fetch file content
def fetch_file_content(url: str):
response = session.get(url)
if response.status_code == 200:
return response.content
else:
raise RuntimeError(f"Failed to download {url}")
# Fetch all pages
page = 1
has_more = True
while has_more:
has_more = fetch_and_process_results(page)
page += 1
# update cache
open(cache_file, "w").writelines(cached_urls)
for file_path in save.glob("*.xml"):
file_content = file_path.read_text() # Read file content as a string
# Run CheckValid to determine if the file is still valid
if not CheckValid(file_content):
# Prompt user for deletion
user_input = input(f"File '{file_path.name}' is no longer valid. Do you want to delete it? (y/N): ")
if user_input.lower() == "y":
try:
file_path.unlink() # Delete the file
print(f"Deleted file: {file_path.name}")
except OSError as e:
print(f"Error deleting file {file_path.name}: {e}")
else:
print(f"Kept file: {file_path.name}")