From 19ba73efa79b2bc0635876da63eb9333f18b8fad Mon Sep 17 00:00:00 2001 From: ShenShuo137 <13722929179@139.com> Date: Sun, 9 Nov 2025 16:06:12 +0800 Subject: [PATCH 1/5] =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test.py | 352 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 352 insertions(+) create mode 100644 test.py diff --git a/test.py b/test.py new file mode 100644 index 0000000..5ac4102 --- /dev/null +++ b/test.py @@ -0,0 +1,352 @@ +""" +用户管理系统 - 包含多种代码质量问题的测试文件 +用于验证智能代码审查工具的检测能力 +""" + +import sqlite3 +import hashlib +import smtplib +from email.mime.text import MIMEText + + +class UserManager: + """ + 用户管理类 + 警告:此代码包含故意设计的安全漏洞和代码质量问题,仅用于教学演示! + """ + + def __init__(self): + # 🔴 问题1: 硬编码数据库凭证 (CWE-798) + self.db_host = "localhost" + self.db_user = "root" + self.db_password = "Admin@123456" # 硬编码密码 + self.db_name = "user_db" + + # 🔴 问题2: 硬编码API密钥 + self.api_key = "sk-1234567890abcdef1234567890abcdef" + + # 🔴 问题3: 硬编码邮箱密码 + self.email_password = "MyEmailPass123" + + self.connection = self._connect_db() + + def _connect_db(self): + """连接数据库""" + conn = sqlite3.connect(f"{self.db_name}.db") + return conn + + # ==================== 认证相关方法 ==================== + + def login(self, username, password): + """ + 用户登录 + 🔴 问题4: SQL注入漏洞 (CWE-89) - Critical + """ + # 直接拼接SQL,未使用参数化查询 + query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'" + + cursor = self.connection.cursor() + result = cursor.execute(query).fetchall() + + if result: + return {"status": "success", "user": result[0]} + else: + return {"status": "failed"} + + def register(self, username, password, email): + """ + 用户注册 + 🔴 问题5: SQL注入漏洞 (CWE-89) + """ + # 同样的SQL注入问题 + query = f"INSERT INTO users (username, password, email) VALUES ('{username}', '{password}', '{email}')" + + cursor = self.connection.cursor() + cursor.execute(query) + self.connection.commit() + + return {"status": "success", "message": "User registered"} + + def authenticate_user(self, token): + """ + 验证用户token + 🔴 问题6: SQL注入漏洞 + """ + query = f"SELECT * FROM users WHERE token='{token}'" + cursor = self.connection.cursor() + return cursor.execute(query).fetchone() + + def reset_password(self, username, new_password): + """ + 重置密码 + 🔴 问题7: SQL注入 + 明文存储密码 + """ + query = f"UPDATE users SET password='{new_password}' WHERE username='{username}'" + cursor = self.connection.cursor() + cursor.execute(query) + self.connection.commit() + + # ==================== 通知相关方法 ==================== + + def send_welcome_email(self, user_email, username): + """ + 发送欢迎邮件 + 🔴 问题8: XSS漏洞 (CWE-79) - 如果用于Web + """ + # 未转义用户输入,直接插入HTML + html_content = f""" + + +

Welcome {username}!

+

Your email is: {user_email}

+ + + """ + + # 发送邮件的代码(简化版) + msg = MIMEText(html_content, 'html') + msg['Subject'] = 'Welcome!' + msg['From'] = 'admin@example.com' + msg['To'] = user_email + + # 实际不会真的发送 + return html_content + + def display_user_profile(self, user_input): + """ + 显示用户资料 + 🔴 问题9: XSS漏洞 (CWE-79) + """ + # 未对用户输入进行HTML转义 + html = f"

{user_input}

" + return html + + def send_notification(self, user_id, message): + """ + 发送通知 + 🔴 问题10: XSS漏洞 + """ + # 从数据库获取用户信息(含SQL注入风险) + query = f"SELECT email FROM users WHERE id={user_id}" + cursor = self.connection.cursor() + result = cursor.execute(query).fetchone() + + if result: + email = result[0] + # 未转义message + html_message = f"

Notification: {message}

" + # 发送邮件... + return html_message + + # ==================== 计算相关方法 ==================== + + def calculate_user_discount(self, user_type, amount): + """ + 计算用户折扣 + 🔴 问题11: 代码重复 (与calculate_user_tax类似) + """ + if user_type == "VIP": + discount = amount * 0.2 + elif user_type == "Premium": + discount = amount * 0.15 + elif user_type == "Regular": + discount = amount * 0.1 + elif user_type == "New": + discount = amount * 0.05 + else: + discount = 0 + + final_amount = amount - discount + return final_amount + + def calculate_user_tax(self, user_type, amount): + """ + 计算用户税费 + 🔴 问题12: 代码重复 (逻辑与calculate_user_discount几乎相同) + """ + if user_type == "VIP": + tax = amount * 0.15 + elif user_type == "Premium": + tax = amount * 0.18 + elif user_type == "Regular": + tax = amount * 0.20 + elif user_type == "New": + tax = amount * 0.22 + else: + tax = amount * 0.25 + + final_amount = amount + tax + return final_amount + + def calculate_shipping_fee(self, user_type, distance): + """ + 计算运费 + 🔴 问题13: 重复的条件判断模式 + """ + if user_type == "VIP": + fee = distance * 0.5 + elif user_type == "Premium": + fee = distance * 0.7 + elif user_type == "Regular": + fee = distance * 1.0 + elif user_type == "New": + fee = distance * 1.2 + else: + fee = distance * 1.5 + + return fee + + # ==================== 报表相关方法 ==================== + + def generate_user_report(self, user_id): + """ + 生成用户报表 + 🔴 问题14: SQL注入 + 过长方法 + 高复杂度 + """ + # SQL注入风险 + query = f"SELECT * FROM users WHERE id={user_id}" + cursor = self.connection.cursor() + user_data = cursor.execute(query).fetchone() + + if not user_data: + return None + + # 复杂的报表生成逻辑(圈复杂度高) + report = {} + + if user_data[3] == "VIP": + report['level'] = 'VIP' + report['discount'] = 20 + if user_data[4] > 10000: + report['bonus'] = 'Gold Badge' + if user_data[5] > 50: + report['extra'] = 'Free Shipping' + else: + report['extra'] = 'Priority Support' + else: + report['bonus'] = 'Silver Badge' + elif user_data[3] == "Premium": + report['level'] = 'Premium' + report['discount'] = 15 + if user_data[4] > 5000: + report['bonus'] = 'Silver Badge' + else: + report['bonus'] = 'Bronze Badge' + else: + report['level'] = 'Regular' + report['discount'] = 10 + + # 更多复杂的判断... + if user_data[6] == True: + report['verified'] = 'Yes' + if user_data[7] > 100: + report['trust_score'] = 'High' + else: + report['trust_score'] = 'Medium' + else: + report['verified'] = 'No' + report['trust_score'] = 'Low' + + return report + + def export_users_to_csv(self, filename): + """ + 导出用户到CSV + 🔴 问题15: 路径遍历漏洞 (CWE-22) + """ + # 未验证filename,可能导致路径遍历 + query = "SELECT * FROM users" + cursor = self.connection.cursor() + users = cursor.execute(query).fetchall() + + # 直接使用用户提供的文件名 + with open(filename, 'w') as f: + for user in users: + f.write(str(user) + '\n') + + # ==================== 管理相关方法 ==================== + + def delete_user(self, user_id): + """ + 删除用户 + 🔴 问题16: SQL注入 + """ + query = f"DELETE FROM users WHERE id={user_id}" + cursor = self.connection.cursor() + cursor.execute(query) + self.connection.commit() + + def search_users(self, keyword): + """ + 搜索用户 + 🔴 问题17: SQL注入 + """ + # LIKE查询也存在注入风险 + query = f"SELECT * FROM users WHERE username LIKE '%{keyword}%'" + cursor = self.connection.cursor() + return cursor.execute(query).fetchall() + + def update_user_role(self, username, new_role): + """ + 更新用户角色 + 🔴 问题18: SQL注入 + 权限控制缺失 + """ + query = f"UPDATE users SET role='{new_role}' WHERE username='{username}'" + cursor = self.connection.cursor() + cursor.execute(query) + self.connection.commit() + + # ==================== 统计相关方法 ==================== + + def get_user_statistics(self): + """ + 获取用户统计 + 🔴 问题19: 方法过长 + 复杂度高 + """ + stats = {} + + # 统计总用户数 + cursor = self.connection.cursor() + total_users = cursor.execute("SELECT COUNT(*) FROM users").fetchone()[0] + stats['total'] = total_users + + # 统计各类型用户 + vip_count = cursor.execute("SELECT COUNT(*) FROM users WHERE type='VIP'").fetchone()[0] + premium_count = cursor.execute("SELECT COUNT(*) FROM users WHERE type='Premium'").fetchone()[0] + regular_count = cursor.execute("SELECT COUNT(*) FROM users WHERE type='Regular'").fetchone()[0] + + stats['vip'] = vip_count + stats['premium'] = premium_count + stats['regular'] = regular_count + + # 计算百分比 + if total_users > 0: + stats['vip_percent'] = (vip_count / total_users) * 100 + stats['premium_percent'] = (premium_count / total_users) * 100 + stats['regular_percent'] = (regular_count / total_users) * 100 + + # 统计活跃用户 + active_users = cursor.execute("SELECT COUNT(*) FROM users WHERE active=1").fetchone()[0] + stats['active'] = active_users + + if total_users > 0: + stats['active_rate'] = (active_users / total_users) * 100 + + return stats + + +# 🔴 问题20: 在模块级别执行危险操作 +if __name__ == "__main__": + # 创建实例时就暴露了凭证 + manager = UserManager() + + # 测试用例(演示如何被攻击) + # 攻击示例1: SQL注入 + malicious_input = "admin' OR '1'='1" + result = manager.login(malicious_input, "anything") + print(result) + + # 攻击示例2: XSS + xss_payload = "" + html = manager.display_user_profile(xss_payload) + print(html) \ No newline at end of file From 065a760ac543f2e80068ba9df852967cee67f690 Mon Sep 17 00:00:00 2001 From: ShenShuo137 <13722929179@139.com> Date: Sun, 9 Nov 2025 16:15:24 +0800 Subject: [PATCH 2/5] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- test.py | 352 --------------------------------------------------- user_auth.py | 137 ++++++++++++++++++++ 2 files changed, 137 insertions(+), 352 deletions(-) delete mode 100644 test.py create mode 100644 user_auth.py diff --git a/test.py b/test.py deleted file mode 100644 index 5ac4102..0000000 --- a/test.py +++ /dev/null @@ -1,352 +0,0 @@ -""" -用户管理系统 - 包含多种代码质量问题的测试文件 -用于验证智能代码审查工具的检测能力 -""" - -import sqlite3 -import hashlib -import smtplib -from email.mime.text import MIMEText - - -class UserManager: - """ - 用户管理类 - 警告:此代码包含故意设计的安全漏洞和代码质量问题,仅用于教学演示! - """ - - def __init__(self): - # 🔴 问题1: 硬编码数据库凭证 (CWE-798) - self.db_host = "localhost" - self.db_user = "root" - self.db_password = "Admin@123456" # 硬编码密码 - self.db_name = "user_db" - - # 🔴 问题2: 硬编码API密钥 - self.api_key = "sk-1234567890abcdef1234567890abcdef" - - # 🔴 问题3: 硬编码邮箱密码 - self.email_password = "MyEmailPass123" - - self.connection = self._connect_db() - - def _connect_db(self): - """连接数据库""" - conn = sqlite3.connect(f"{self.db_name}.db") - return conn - - # ==================== 认证相关方法 ==================== - - def login(self, username, password): - """ - 用户登录 - 🔴 问题4: SQL注入漏洞 (CWE-89) - Critical - """ - # 直接拼接SQL,未使用参数化查询 - query = f"SELECT * FROM users WHERE username='{username}' AND password='{password}'" - - cursor = self.connection.cursor() - result = cursor.execute(query).fetchall() - - if result: - return {"status": "success", "user": result[0]} - else: - return {"status": "failed"} - - def register(self, username, password, email): - """ - 用户注册 - 🔴 问题5: SQL注入漏洞 (CWE-89) - """ - # 同样的SQL注入问题 - query = f"INSERT INTO users (username, password, email) VALUES ('{username}', '{password}', '{email}')" - - cursor = self.connection.cursor() - cursor.execute(query) - self.connection.commit() - - return {"status": "success", "message": "User registered"} - - def authenticate_user(self, token): - """ - 验证用户token - 🔴 问题6: SQL注入漏洞 - """ - query = f"SELECT * FROM users WHERE token='{token}'" - cursor = self.connection.cursor() - return cursor.execute(query).fetchone() - - def reset_password(self, username, new_password): - """ - 重置密码 - 🔴 问题7: SQL注入 + 明文存储密码 - """ - query = f"UPDATE users SET password='{new_password}' WHERE username='{username}'" - cursor = self.connection.cursor() - cursor.execute(query) - self.connection.commit() - - # ==================== 通知相关方法 ==================== - - def send_welcome_email(self, user_email, username): - """ - 发送欢迎邮件 - 🔴 问题8: XSS漏洞 (CWE-79) - 如果用于Web - """ - # 未转义用户输入,直接插入HTML - html_content = f""" - - -

Welcome {username}!

-

Your email is: {user_email}

- - - """ - - # 发送邮件的代码(简化版) - msg = MIMEText(html_content, 'html') - msg['Subject'] = 'Welcome!' - msg['From'] = 'admin@example.com' - msg['To'] = user_email - - # 实际不会真的发送 - return html_content - - def display_user_profile(self, user_input): - """ - 显示用户资料 - 🔴 问题9: XSS漏洞 (CWE-79) - """ - # 未对用户输入进行HTML转义 - html = f"

{user_input}

" - return html - - def send_notification(self, user_id, message): - """ - 发送通知 - 🔴 问题10: XSS漏洞 - """ - # 从数据库获取用户信息(含SQL注入风险) - query = f"SELECT email FROM users WHERE id={user_id}" - cursor = self.connection.cursor() - result = cursor.execute(query).fetchone() - - if result: - email = result[0] - # 未转义message - html_message = f"

Notification: {message}

" - # 发送邮件... - return html_message - - # ==================== 计算相关方法 ==================== - - def calculate_user_discount(self, user_type, amount): - """ - 计算用户折扣 - 🔴 问题11: 代码重复 (与calculate_user_tax类似) - """ - if user_type == "VIP": - discount = amount * 0.2 - elif user_type == "Premium": - discount = amount * 0.15 - elif user_type == "Regular": - discount = amount * 0.1 - elif user_type == "New": - discount = amount * 0.05 - else: - discount = 0 - - final_amount = amount - discount - return final_amount - - def calculate_user_tax(self, user_type, amount): - """ - 计算用户税费 - 🔴 问题12: 代码重复 (逻辑与calculate_user_discount几乎相同) - """ - if user_type == "VIP": - tax = amount * 0.15 - elif user_type == "Premium": - tax = amount * 0.18 - elif user_type == "Regular": - tax = amount * 0.20 - elif user_type == "New": - tax = amount * 0.22 - else: - tax = amount * 0.25 - - final_amount = amount + tax - return final_amount - - def calculate_shipping_fee(self, user_type, distance): - """ - 计算运费 - 🔴 问题13: 重复的条件判断模式 - """ - if user_type == "VIP": - fee = distance * 0.5 - elif user_type == "Premium": - fee = distance * 0.7 - elif user_type == "Regular": - fee = distance * 1.0 - elif user_type == "New": - fee = distance * 1.2 - else: - fee = distance * 1.5 - - return fee - - # ==================== 报表相关方法 ==================== - - def generate_user_report(self, user_id): - """ - 生成用户报表 - 🔴 问题14: SQL注入 + 过长方法 + 高复杂度 - """ - # SQL注入风险 - query = f"SELECT * FROM users WHERE id={user_id}" - cursor = self.connection.cursor() - user_data = cursor.execute(query).fetchone() - - if not user_data: - return None - - # 复杂的报表生成逻辑(圈复杂度高) - report = {} - - if user_data[3] == "VIP": - report['level'] = 'VIP' - report['discount'] = 20 - if user_data[4] > 10000: - report['bonus'] = 'Gold Badge' - if user_data[5] > 50: - report['extra'] = 'Free Shipping' - else: - report['extra'] = 'Priority Support' - else: - report['bonus'] = 'Silver Badge' - elif user_data[3] == "Premium": - report['level'] = 'Premium' - report['discount'] = 15 - if user_data[4] > 5000: - report['bonus'] = 'Silver Badge' - else: - report['bonus'] = 'Bronze Badge' - else: - report['level'] = 'Regular' - report['discount'] = 10 - - # 更多复杂的判断... - if user_data[6] == True: - report['verified'] = 'Yes' - if user_data[7] > 100: - report['trust_score'] = 'High' - else: - report['trust_score'] = 'Medium' - else: - report['verified'] = 'No' - report['trust_score'] = 'Low' - - return report - - def export_users_to_csv(self, filename): - """ - 导出用户到CSV - 🔴 问题15: 路径遍历漏洞 (CWE-22) - """ - # 未验证filename,可能导致路径遍历 - query = "SELECT * FROM users" - cursor = self.connection.cursor() - users = cursor.execute(query).fetchall() - - # 直接使用用户提供的文件名 - with open(filename, 'w') as f: - for user in users: - f.write(str(user) + '\n') - - # ==================== 管理相关方法 ==================== - - def delete_user(self, user_id): - """ - 删除用户 - 🔴 问题16: SQL注入 - """ - query = f"DELETE FROM users WHERE id={user_id}" - cursor = self.connection.cursor() - cursor.execute(query) - self.connection.commit() - - def search_users(self, keyword): - """ - 搜索用户 - 🔴 问题17: SQL注入 - """ - # LIKE查询也存在注入风险 - query = f"SELECT * FROM users WHERE username LIKE '%{keyword}%'" - cursor = self.connection.cursor() - return cursor.execute(query).fetchall() - - def update_user_role(self, username, new_role): - """ - 更新用户角色 - 🔴 问题18: SQL注入 + 权限控制缺失 - """ - query = f"UPDATE users SET role='{new_role}' WHERE username='{username}'" - cursor = self.connection.cursor() - cursor.execute(query) - self.connection.commit() - - # ==================== 统计相关方法 ==================== - - def get_user_statistics(self): - """ - 获取用户统计 - 🔴 问题19: 方法过长 + 复杂度高 - """ - stats = {} - - # 统计总用户数 - cursor = self.connection.cursor() - total_users = cursor.execute("SELECT COUNT(*) FROM users").fetchone()[0] - stats['total'] = total_users - - # 统计各类型用户 - vip_count = cursor.execute("SELECT COUNT(*) FROM users WHERE type='VIP'").fetchone()[0] - premium_count = cursor.execute("SELECT COUNT(*) FROM users WHERE type='Premium'").fetchone()[0] - regular_count = cursor.execute("SELECT COUNT(*) FROM users WHERE type='Regular'").fetchone()[0] - - stats['vip'] = vip_count - stats['premium'] = premium_count - stats['regular'] = regular_count - - # 计算百分比 - if total_users > 0: - stats['vip_percent'] = (vip_count / total_users) * 100 - stats['premium_percent'] = (premium_count / total_users) * 100 - stats['regular_percent'] = (regular_count / total_users) * 100 - - # 统计活跃用户 - active_users = cursor.execute("SELECT COUNT(*) FROM users WHERE active=1").fetchone()[0] - stats['active'] = active_users - - if total_users > 0: - stats['active_rate'] = (active_users / total_users) * 100 - - return stats - - -# 🔴 问题20: 在模块级别执行危险操作 -if __name__ == "__main__": - # 创建实例时就暴露了凭证 - manager = UserManager() - - # 测试用例(演示如何被攻击) - # 攻击示例1: SQL注入 - malicious_input = "admin' OR '1'='1" - result = manager.login(malicious_input, "anything") - print(result) - - # 攻击示例2: XSS - xss_payload = "" - html = manager.display_user_profile(xss_payload) - print(html) \ No newline at end of file diff --git a/user_auth.py b/user_auth.py new file mode 100644 index 0000000..8fa0ba6 --- /dev/null +++ b/user_auth.py @@ -0,0 +1,137 @@ +""" +配置管理模块 - 测试PR-Agent和CodeQL审查能力 +包含多种安全问题、代码质量问题和业务逻辑缺陷 +""" +import os +import pickle +import subprocess +import json + +class ConfigManager: + def __init__(self, config_dir): + self.config_dir = config_dir + self.current_user = None + + def load_config(self, filename): + """加载配置文件 - 存在路径遍历漏洞""" + # ❌ 严重安全问题:路径遍历 (CWE-22) + file_path = self.config_dir + "/" + filename + + try: + with open(file_path, 'r') as f: + data = f.read() + return data + except: + pass # ❌ 空的异常处理 + + def save_config(self, filename, data): + """保存配置 - 不安全的反序列化""" + file_path = self.config_dir + "/" + filename + + # ❌ 严重安全问题:pickle 不安全的反序列化 (CWE-502) + with open(file_path, 'wb') as f: + pickle.dump(data, f) + + print(f"配置已保存到 {file_path}") # ❌ 可能泄露路径信息 + return True + + def execute_command(self, command): + """执行系统命令 - 命令注入漏洞""" + # ❌ 严重安全问题:命令注入 (CWE-78) + full_command = f"echo 'Running: {command}' && {command}" + result = subprocess.call(full_command, shell=True) + + return result + + def get_config_list(self, search_pattern): + """列出配置文件 - 命令注入""" + # ❌ 命令注入:用户输入直接拼接到 shell 命令 + cmd = f"ls {self.config_dir} | grep {search_pattern}" + output = os.popen(cmd).read() + + # ❌ 没有关闭 popen 返回的文件对象 + return output.split('\n') + + def validate_user(self, username, api_key): + """验证用户凭据 - 硬编码密钥""" + # ❌ 严重安全问题:硬编码 API 密钥 (CWE-798) + MASTER_KEY = "sk_live_51H1234567890abcdef" + + if api_key == MASTER_KEY or api_key == "admin123": + self.current_user = username + return True + + # ❌ 使用弱加密算法 + import md5 # ❌ MD5 已被破解 + hashed = md5.new(api_key.encode()).hexdigest() + + return self.check_hash(username, hashed) + + def check_hash(self, username, hashed): + """检查哈希值 - 存在时序攻击风险""" + stored_hash = self.get_stored_hash(username) + + # ❌ 不安全的字符串比较 (时序攻击 CWE-208) + if hashed == stored_hash: + return True + return False + + def get_stored_hash(self, username): + """从文件读取哈希""" + # ❌ SQL注入风险(如果改用数据库) + # ❌ 路径遍历 + hash_file = f"./hashes/{username}.txt" + + try: + f = open(hash_file, 'r') # ❌ 没有使用 with,可能泄露文件句柄 + hash_value = f.read() + return hash_value.strip() + except: + return None # ❌ 异常被吞没 + + def backup_config(self, config_name): + """备份配置 - 多个问题""" + timestamp = __import__('time').time() + backup_name = f"{config_name}_{timestamp}.bak" + + # ❌ 命令注入 + cmd = f"cp {self.config_dir}/{config_name} /backup/{backup_name}" + os.system(cmd) # ❌ 使用 os.system 而不是 subprocess + + print(f"Backup created: {backup_name}") + + def delete_config(self, filename): + """删除配置 - 缺少权限检查""" + # ❌ 没有验证当前用户权限 + # ❌ 路径遍历 + file_path = self.config_dir + "/" + filename + + try: + os.remove(file_path) + return True + except Exception as e: + print(e) # ❌ 直接打印异常,可能泄露信息 + return False + + def import_config(self, url): + """从URL导入配置 - SSRF漏洞""" + import urllib.request + + # ❌ 严重安全问题:服务器端请求伪造 (SSRF) + # 用户可以让服务器访问内网资源 + response = urllib.request.urlopen(url) + config_data = response.read() + + # ❌ 不安全的 eval + try: + parsed = eval(config_data) # ❌ 代码注入 (CWE-94) + return parsed + except: + pass + +# ❌ 全局变量 +manager = ConfigManager("/etc/app/configs") + +# ❌ 没有 if __name__ == "__main__" 保护 +manager.validate_user("admin", "admin123") +manager.execute_command("ls -la") \ No newline at end of file From deff939d92cf7846738bd70c1d9f2331b275a5c1 Mon Sep 17 00:00:00 2001 From: ShenShuo137 <13722929179@139.com> Date: Sun, 9 Nov 2025 16:27:36 +0800 Subject: [PATCH 3/5] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- user_auth.py | 154 +++++++++++++++++++++++---------------------------- 1 file changed, 68 insertions(+), 86 deletions(-) diff --git a/user_auth.py b/user_auth.py index 8fa0ba6..857415a 100644 --- a/user_auth.py +++ b/user_auth.py @@ -2,136 +2,118 @@ 配置管理模块 - 测试PR-Agent和CodeQL审查能力 包含多种安全问题、代码质量问题和业务逻辑缺陷 """ +import time import os import pickle import subprocess import json +import urllib.request class ConfigManager: def __init__(self, config_dir): - self.config_dir = config_dir + self.config_dir = os.path.abspath(config_dir) self.current_user = None def load_config(self, filename): - """加载配置文件 - 存在路径遍历漏洞""" - # ❌ 严重安全问题:路径遍历 (CWE-22) - file_path = self.config_dir + "/" + filename + """加载配置文件""" + file_path = os.path.join(self.config_dir, filename) try: with open(file_path, 'r') as f: - data = f.read() - return data - except: - pass # ❌ 空的异常处理 + return f.read() + except FileNotFoundError: + print(f"配置文件 {filename} 未找到") + return None def save_config(self, filename, data): - """保存配置 - 不安全的反序列化""" - file_path = self.config_dir + "/" + filename + """保存配置""" + file_path = os.path.join(self.config_dir, filename) - # ❌ 严重安全问题:pickle 不安全的反序列化 (CWE-502) + # 继续使用 pickle,会留下反序列化漏洞 with open(file_path, 'wb') as f: - pickle.dump(data, f) + pickle.dump(data, f) # ❌ 不安全的反序列化 (CWE-502) - print(f"配置已保存到 {file_path}") # ❌ 可能泄露路径信息 + print(f"配置已保存到 {file_path}") return True def execute_command(self, command): - """执行系统命令 - 命令注入漏洞""" - # ❌ 严重安全问题:命令注入 (CWE-78) - full_command = f"echo 'Running: {command}' && {command}" - result = subprocess.call(full_command, shell=True) + """执行系统命令""" + # 现在命令执行安全,留出示例,但仅使用 echo + result = subprocess.run(command, shell=True, capture_output=True, text=True) - return result + if result.returncode != 0: + print(f"命令执行失败: {result.stderr}") + return result.stdout def get_config_list(self, search_pattern): - """列出配置文件 - 命令注入""" - # ❌ 命令注入:用户输入直接拼接到 shell 命令 - cmd = f"ls {self.config_dir} | grep {search_pattern}" - output = os.popen(cmd).read() - - # ❌ 没有关闭 popen 返回的文件对象 - return output.split('\n') - - def validate_user(self, username, api_key): - """验证用户凭据 - 硬编码密钥""" - # ❌ 严重安全问题:硬编码 API 密钥 (CWE-798) - MASTER_KEY = "sk_live_51H1234567890abcdef" - - if api_key == MASTER_KEY or api_key == "admin123": - self.current_user = username - return True - - # ❌ 使用弱加密算法 - import md5 # ❌ MD5 已被破解 - hashed = md5.new(api_key.encode()).hexdigest() - - return self.check_hash(username, hashed) + """列出配置文件""" + # 这里仍然使用命令注入的方式 + cmd = f"ls {self.config_dir} | grep {search_pattern}" # ❌ 命令注入风险 + try: + output = subprocess.check_output(cmd, shell=True, text=True) + return output.split('\n') + except subprocess.CalledProcessError as e: + print(f"列出配置文件失败: {e.output}") + return [] def check_hash(self, username, hashed): - """检查哈希值 - 存在时序攻击风险""" + """检查哈希值""" stored_hash = self.get_stored_hash(username) - - # ❌ 不安全的字符串比较 (时序攻击 CWE-208) - if hashed == stored_hash: - return True - return False + return self.safe_compare(hashed, stored_hash) + + def safe_compare(self, a, b): + """安全字符串比较,防止时序攻击""" + return a == b def get_stored_hash(self, username): """从文件读取哈希""" - # ❌ SQL注入风险(如果改用数据库) - # ❌ 路径遍历 - hash_file = f"./hashes/{username}.txt" + hash_file = os.path.join("./hashes", f"{username}.txt") try: - f = open(hash_file, 'r') # ❌ 没有使用 with,可能泄露文件句柄 - hash_value = f.read() - return hash_value.strip() - except: - return None # ❌ 异常被吞没 + with open(hash_file, 'r') as f: + return f.read().strip() + except FileNotFoundError: + return None def backup_config(self, config_name): - """备份配置 - 多个问题""" - timestamp = __import__('time').time() - backup_name = f"{config_name}_{timestamp}.bak" - - # ❌ 命令注入 - cmd = f"cp {self.config_dir}/{config_name} /backup/{backup_name}" - os.system(cmd) # ❌ 使用 os.system 而不是 subprocess - - print(f"Backup created: {backup_name}") - + """备份配置""" + backup_name = f"{config_name}_{int(time.time())}.bak" + backup_path = os.path.join("/backup", backup_name) + + try: + from shutil import copy2 # 复制文件和元数据 + copy2(os.path.join(self.config_dir, config_name), backup_path) + print(f"备份已创建: {backup_name}") + except Exception as e: + print(f"备份失败: {e}") + def delete_config(self, filename): - """删除配置 - 缺少权限检查""" - # ❌ 没有验证当前用户权限 - # ❌ 路径遍历 - file_path = self.config_dir + "/" + filename + """删除配置""" + file_path = os.path.join(self.config_dir, filename) try: os.remove(file_path) return True + except FileNotFoundError: + print(f"配置文件 {filename} 未找到") + return False except Exception as e: - print(e) # ❌ 直接打印异常,可能泄露信息 + print(f"删除配置失败: {e}") return False def import_config(self, url): - """从URL导入配置 - SSRF漏洞""" - import urllib.request - - # ❌ 严重安全问题:服务器端请求伪造 (SSRF) - # 用户可以让服务器访问内网资源 - response = urllib.request.urlopen(url) - config_data = response.read() - - # ❌ 不安全的 eval + """从URL导入配置""" try: - parsed = eval(config_data) # ❌ 代码注入 (CWE-94) + response = urllib.request.urlopen(url) + config_data = response.read() + parsed = json.loads(config_data) # 使用安全的 json 解析 return parsed - except: - pass + except Exception as e: + print(f"导入配置失败: {e}") + return None -# ❌ 全局变量 -manager = ConfigManager("/etc/app/configs") -# ❌ 没有 if __name__ == "__main__" 保护 -manager.validate_user("admin", "admin123") -manager.execute_command("ls -la") \ No newline at end of file +# 只有在脚本直接运行时才执行此部分 +if __name__ == "__main__": + manager = ConfigManager("/etc/app/configs") + manager.execute_command("ls -la") \ No newline at end of file From f0ef8b6096bd1b71f0e399d6cc69b5d549fd1690 Mon Sep 17 00:00:00 2001 From: ShenShuo137 <13722929179@139.com> Date: Sun, 9 Nov 2025 16:32:42 +0800 Subject: [PATCH 4/5] =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- user_auth.py | 99 +++++----------------------------------------------- 1 file changed, 9 insertions(+), 90 deletions(-) diff --git a/user_auth.py b/user_auth.py index 857415a..ea3bf2a 100644 --- a/user_auth.py +++ b/user_auth.py @@ -1,53 +1,28 @@ """ -配置管理模块 - 测试PR-Agent和CodeQL审查能力 -包含多种安全问题、代码质量问题和业务逻辑缺陷 +配置管理模块 - 漏洞测试 +包含两个安全问题示例 """ -import time import os import pickle import subprocess -import json -import urllib.request class ConfigManager: def __init__(self, config_dir): self.config_dir = os.path.abspath(config_dir) - self.current_user = None - - def load_config(self, filename): - """加载配置文件""" - file_path = os.path.join(self.config_dir, filename) - - try: - with open(file_path, 'r') as f: - return f.read() - except FileNotFoundError: - print(f"配置文件 {filename} 未找到") - return None - + def save_config(self, filename, data): - """保存配置""" + """保存配置 - 不安全的反序列化""" file_path = os.path.join(self.config_dir, filename) - # 继续使用 pickle,会留下反序列化漏洞 + # 使用 pickle 存储数据,存在反序列化漏洞 with open(file_path, 'wb') as f: pickle.dump(data, f) # ❌ 不安全的反序列化 (CWE-502) print(f"配置已保存到 {file_path}") return True - def execute_command(self, command): - """执行系统命令""" - # 现在命令执行安全,留出示例,但仅使用 echo - result = subprocess.run(command, shell=True, capture_output=True, text=True) - - if result.returncode != 0: - print(f"命令执行失败: {result.stderr}") - return result.stdout - def get_config_list(self, search_pattern): - """列出配置文件""" - # 这里仍然使用命令注入的方式 + """列出配置文件 - 命令注入风险""" cmd = f"ls {self.config_dir} | grep {search_pattern}" # ❌ 命令注入风险 try: output = subprocess.check_output(cmd, shell=True, text=True) @@ -55,65 +30,9 @@ def get_config_list(self, search_pattern): except subprocess.CalledProcessError as e: print(f"列出配置文件失败: {e.output}") return [] - - def check_hash(self, username, hashed): - """检查哈希值""" - stored_hash = self.get_stored_hash(username) - return self.safe_compare(hashed, stored_hash) - - def safe_compare(self, a, b): - """安全字符串比较,防止时序攻击""" - return a == b - - def get_stored_hash(self, username): - """从文件读取哈希""" - hash_file = os.path.join("./hashes", f"{username}.txt") - - try: - with open(hash_file, 'r') as f: - return f.read().strip() - except FileNotFoundError: - return None - - def backup_config(self, config_name): - """备份配置""" - backup_name = f"{config_name}_{int(time.time())}.bak" - backup_path = os.path.join("/backup", backup_name) - - try: - from shutil import copy2 # 复制文件和元数据 - copy2(os.path.join(self.config_dir, config_name), backup_path) - print(f"备份已创建: {backup_name}") - except Exception as e: - print(f"备份失败: {e}") - - def delete_config(self, filename): - """删除配置""" - file_path = os.path.join(self.config_dir, filename) - - try: - os.remove(file_path) - return True - except FileNotFoundError: - print(f"配置文件 {filename} 未找到") - return False - except Exception as e: - print(f"删除配置失败: {e}") - return False - - def import_config(self, url): - """从URL导入配置""" - try: - response = urllib.request.urlopen(url) - config_data = response.read() - parsed = json.loads(config_data) # 使用安全的 json 解析 - return parsed - except Exception as e: - print(f"导入配置失败: {e}") - return None - -# 只有在脚本直接运行时才执行此部分 +# 仅用于演示 if __name__ == "__main__": manager = ConfigManager("/etc/app/configs") - manager.execute_command("ls -la") \ No newline at end of file + manager.save_config("test_config.pkl", {"key": "value"}) + manager.get_config_list("test") \ No newline at end of file From 25308e0b3d6f827bb573e6755f6d3b5a12b680ed Mon Sep 17 00:00:00 2001 From: ShenShuo137 <13722929179@139.com> Date: Sun, 9 Nov 2025 16:42:12 +0800 Subject: [PATCH 5/5] =?UTF-8?q?=E6=8F=90=E4=BA=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- user_auth.py | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/user_auth.py b/user_auth.py index ea3bf2a..823fad5 100644 --- a/user_auth.py +++ b/user_auth.py @@ -2,21 +2,30 @@ 配置管理模块 - 漏洞测试 包含两个安全问题示例 """ + import os import pickle import subprocess +import json + +# 导入保护:防止在生产环境中导入 +if os.getenv("APP_ENV") == "production": + raise ImportError("user_auth 模块仅用于测试,禁止在生产环境中导入") class ConfigManager: def __init__(self, config_dir): + if not os.path.exists(config_dir) or not os.path.isdir(config_dir): + raise ValueError(f"Invalid config directory: {config_dir}") self.config_dir = os.path.abspath(config_dir) - def save_config(self, filename, data): """保存配置 - 不安全的反序列化""" file_path = os.path.join(self.config_dir, filename) # 使用 pickle 存储数据,存在反序列化漏洞 with open(file_path, 'wb') as f: - pickle.dump(data, f) # ❌ 不安全的反序列化 (CWE-502) + with open(file_path, 'w') as f: + json.dump(data, f) # 使用JSON进行安全序列化 + print(f"配置已保存到 {file_path}") return True