From 9656a05733d70cbd780022b7903730255e1810ec Mon Sep 17 00:00:00 2001 From: Aadarsh Padiyath Date: Tue, 9 Jun 2026 15:37:38 -0400 Subject: [PATCH 1/3] Fix CodeTailor security vulnerabilities - Add authentication (Depends(auth_manager)) to parsons_scaffolding endpoint - Remove exec()-based Python test runner; replace with JOBE sandbox via evaluate_fixed_code.py - Fetch test code from DB (question_json.suffix_code) instead of accepting it from request body - Add clean_python_testcase() to transform Runestone unittest.gui format for JOBE compatibility Co-Authored-By: Claude Sonnet 4.6 --- bases/rsptx/book_server_api/routers/coach.py | 92 ++++++++++++--- .../evaluate_fixed_code.py | 110 +++++++++--------- 2 files changed, 131 insertions(+), 71 deletions(-) diff --git a/bases/rsptx/book_server_api/routers/coach.py b/bases/rsptx/book_server_api/routers/coach.py index ce1b543f2..255d0ae18 100644 --- a/bases/rsptx/book_server_api/routers/coach.py +++ b/bases/rsptx/book_server_api/routers/coach.py @@ -14,7 +14,7 @@ # Third-party imports # ------------------- -from fastapi import APIRouter, Request +from fastapi import APIRouter, Depends, Request from pyflakes import checker as pyflakes_checker # Local application imports @@ -32,8 +32,10 @@ from .assessment import get_question_source, SelectQRequest # Import function for fetching api - comment out for DEV purposes +from rsptx.auth.session import auth_manager from rsptx.db.crud.crud import fetch_api_token from rsptx.db.crud.course import fetch_course +from rsptx.db.crud.question import fetch_question # .. _APIRouter config: # @@ -85,6 +87,24 @@ async def python_check(request: Request): # for dev/test -- replace with your own key for local testing +def clean_python_testcase(raw_test_code: str) -> str: + """ + Transform Runestone browser-style test code to standard unittest format. + Mirrors the cleanTestcase() transformation in activecode.js so that + suffix_code from the DB can be run by JOBE (which has no unittest.gui). + """ + result = re.sub( + r"from unittest\.gui import TestCaseGui\s*\n", + "import unittest\n", + raw_test_code, + ) + result = result.replace( + "class myTests(TestCaseGui):", "class myTests(unittest.TestCase):" + ) + result = re.sub(r"^\s*myTests\(\)\.main\(\)\s*$", "", result, flags=re.MULTILINE) + return result + + def extract_parsons_code(html_block): """ Given the full HTML/pre block for a Parsons problem extracted from DB, @@ -145,7 +165,9 @@ async def get_question_html(request: Request, div_id: str): # @router.post("/ns/coach/parsons_scaffolding") @router.post("/parsons_scaffolding") -async def parsons_scaffolding(request: Request, course: Optional[str]): +async def parsons_scaffolding( + request: Request, course: Optional[str], user=Depends(auth_manager) +): # Get `course` directly from the query string rslogger.warning(f"URL seen: {request.url}") rslogger.warning(f"Query parameters: {request.query_params}") @@ -211,24 +233,56 @@ async def parsons_scaffolding(request: Request, course: Optional[str]): _ = req_bytes.decode("utf-8") data = await request.json() - language = data.get("language") # Capture the question language from the front end - student_code = data.get( - "student_code" - ) # Capture the student code from the front end - problem_id = data.get("problem_id") # Capture the problem name from the front end - personalization_level = data.get( - "personalization_level" - ) # Capture the personalization level set by the instructor from the front end - parsonsexample = data.get( - "parsonsexample" - ) # Capture whether the scaffolding puzzle is a pre-defined example or LLM-example - problem_description = data.get( - "problem_description" - ) # Capture the problem description from the front end - internal_test_case = data.get( - "internal_test_case" - ) # Capture the internal test case from the front end + language = data.get("language") + student_code = data.get("student_code") + problem_id = data.get("problem_id") + personalization_level = data.get("personalization_level") + parsonsexample = data.get("parsonsexample") + problem_description = data.get("problem_description") parsons_personalized = data.get("parsons_personalized", True) + + if not problem_id: + return JSONResponse( + content={"error": "CodeTailor: problem_id is required"}, + status_code=status.HTTP_400_BAD_REQUEST, + ) + + # Fetch the test code from the database using the problem_id. + try: + question = await fetch_question(problem_id) + if question and question.question_json: + internal_test_case = question.question_json.get("suffix_code", "") or "" + else: + rslogger.error( + f"CodeTailor: no question found for problem_id '{problem_id}'" + ) + return JSONResponse( + content={"error": f"CodeTailor: question '{problem_id}' not found"}, + status_code=status.HTTP_400_BAD_REQUEST, + ) + except Exception as e: + rslogger.error(f"CodeTailor: could not fetch test code for '{problem_id}': {e}") + return JSONResponse( + content={ + "error": f"CodeTailor: could not fetch test code for '{problem_id}'" + }, + status_code=status.HTTP_400_BAD_REQUEST, + ) + + if not internal_test_case: + rslogger.error( + f"CodeTailor: question '{problem_id}' has no suffix_code in question_json — cannot validate generated code" + ) + return JSONResponse( + content={ + "error": f"CodeTailor: question '{problem_id}' has no test code in the database" + }, + status_code=status.HTTP_400_BAD_REQUEST, + ) + + if language and language.lower() == "python": + internal_test_case = clean_python_testcase(internal_test_case) + print("start_to: get_parsons_help", api_token, language, personalization_level) adaptive_attr = 'data-adaptive="true"' diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py b/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py index 8a335ed22..a2558da7a 100644 --- a/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py @@ -1,9 +1,5 @@ -import unittest import re import difflib -from types import ModuleType -import threading -import signal import requests as rq import hashlib import base64 @@ -11,22 +7,6 @@ from ..rsproxy import settings -class NullOutput: - def write(self, _): - pass - - def flush(self): - pass - - -class TimeoutError(Exception): - pass - - -def handler(signum, frame): - raise TimeoutError("Test execution exceeded time limit") - - def _runestone_file_id(filename: str, content: str) -> str: # Exactly: "runestone" + MD5(fileName + fileContent) md5 = hashlib.md5((filename + content).encode("utf-8")).hexdigest() @@ -229,45 +209,71 @@ def extract_class_name(code): return False +class _JobeTestResult: + """Minimal stand-in for unittest.TestResult returned by load_and_run_tests.""" + + def __init__(self, passed: bool): + self._passed = passed + + def wasSuccessful(self) -> bool: + return self._passed + + def load_and_run_tests(unittest_case, code_to_test, time_limit=6): """ - Load and run Python test cases against the provided code. + Run Python test cases against the provided code via JOBE. + Inputs: - unittest_case (str): The Python test cases. The test code is automatically reformatted based on the unittest_code provided by instructors in the RST file. - code_to_test (str): The Python code to be tested. - time_limit (int): The time limit for running the tests in seconds. - Output: unittest.TestResult: The result of the test run. + unittest_case (str): unittest source (class myTests(unittest.TestCase): ...) + code_to_test (str): the Python solution code to validate + time_limit (int): JOBE wall-clock time limit in seconds + Output: _JobeTestResult with wasSuccessful() method """ - # Set the alarm signal for timeout - if threading.current_thread() is threading.main_thread(): - signal.signal(signal.SIGALRM, handler) - signal.alarm(time_limit) + combined = ( + code_to_test + + "\n\n" + + unittest_case + + "\n\nimport unittest as _ut\n" + + "_result = _ut.main(verbosity=0, exit=False)\n" + + 'print("PASS" if _result.result.wasSuccessful() else "FAIL")\n' + ) + + filename = "solution.py" + file_id = _runestone_file_id(filename, combined) + b64 = _b64_text_utf8(combined) + + sess = _jobe_session() + file_url = settings.jobe_server + "/jobe/index.php/restapi/files/" + file_id + runs_url = settings.jobe_server + "/jobe/index.php/restapi/runs/" try: - # Create a dummy module to hold the test cases - test_module = ModuleType("test_module") - test_module.unittest = unittest - - # Execute the test cases string within the dummy module's namespace - exec(unittest_case, test_module.__dict__) - # Execute the code to test within the desired scope - exec(code_to_test, test_module.__dict__) - # Retrieve the loaded test cases - test_suite = unittest.TestLoader().loadTestsFromModule(test_module) - print("test_suite", test_suite) - # Run the test suite - test_results = unittest.TextTestRunner( - verbosity=0, failfast=True, stream=NullOutput() - ).run(test_suite) - print("test_results", test_results) - - except TimeoutError: - print("test_results", test_results) - return False - finally: - signal.alarm(0) + r = sess.head(file_url, timeout=10) + if r.status_code != 204: + put = sess.put(file_url, json={"file_contents": b64}, timeout=10) + if put.status_code != 204: + return _JobeTestResult(False) - return test_results + runspec = { + "language_id": "python3", + "sourcecode": combined, + "sourcefilename": filename, + "parameters": {"timelimitsecs": time_limit}, + } + resp = sess.post(runs_url, json={"run_spec": runspec}, timeout=time_limit + 10) + try: + result = resp.json() + except Exception: + return _JobeTestResult(False) + + out = (result.get("stdout") or "").strip() + # Outcome 15 = success; outcome 12 = runtime error but some JOBE/Python3 + # sandbox configurations exit non-zero even when tests pass and "PASS" is + # printed. Trust stdout over the outcome code. + passed = out == "PASS" + return _JobeTestResult(passed) + + except Exception: + return _JobeTestResult(False) def fix_indentation(text): From 8bf3720d06db4306afc171aed7322bcf68989203 Mon Sep 17 00:00:00 2001 From: Aadarsh Padiyath Date: Tue, 9 Jun 2026 16:17:10 -0400 Subject: [PATCH 2/3] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- .../routers/personalized_parsons/evaluate_fixed_code.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py b/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py index a2558da7a..54b0a95e7 100644 --- a/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py @@ -230,7 +230,8 @@ def load_and_run_tests(unittest_case, code_to_test, time_limit=6): Output: _JobeTestResult with wasSuccessful() method """ combined = ( - code_to_test + '__name__ = "__runestone__"\n' + + code_to_test + "\n\n" + unittest_case + "\n\nimport unittest as _ut\n" From 8b706064bf460036c356fe3fa4fa11ba193f7ba5 Mon Sep 17 00:00:00 2001 From: Aadarsh Padiyath Date: Tue, 9 Jun 2026 16:27:41 -0400 Subject: [PATCH 3/3] Address Copilot review comments on CodeTailor security PR - Suppress __name__ == "__main__" guards in student code sent to JOBE - Check last stdout line for PASS/FAIL to tolerate student debug prints - Remove redundant HEAD/PUT file-store calls (sourcecode already sent inline) - Pass basecourse to fetch_question to prevent wrong result from duplicate names Co-Authored-By: Claude Sonnet 4.6 --- bases/rsptx/book_server_api/routers/coach.py | 3 ++- .../evaluate_fixed_code.py | 23 ++++++------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/bases/rsptx/book_server_api/routers/coach.py b/bases/rsptx/book_server_api/routers/coach.py index 255d0ae18..31ccfef7a 100644 --- a/bases/rsptx/book_server_api/routers/coach.py +++ b/bases/rsptx/book_server_api/routers/coach.py @@ -249,7 +249,8 @@ async def parsons_scaffolding( # Fetch the test code from the database using the problem_id. try: - question = await fetch_question(problem_id) + basecourse = getattr(course, "base_course", None) + question = await fetch_question(problem_id, basecourse=basecourse) if question and question.question_json: internal_test_case = question.question_json.get("suffix_code", "") or "" else: diff --git a/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py b/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py index 54b0a95e7..4d93f0300 100644 --- a/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py +++ b/bases/rsptx/book_server_api/routers/personalized_parsons/evaluate_fixed_code.py @@ -229,6 +229,7 @@ def load_and_run_tests(unittest_case, code_to_test, time_limit=6): time_limit (int): JOBE wall-clock time limit in seconds Output: _JobeTestResult with wasSuccessful() method """ + # Suppress __main__ guards in student code — JOBE runs as top-level script combined = ( '__name__ = "__runestone__"\n' + code_to_test @@ -239,25 +240,14 @@ def load_and_run_tests(unittest_case, code_to_test, time_limit=6): + 'print("PASS" if _result.result.wasSuccessful() else "FAIL")\n' ) - filename = "solution.py" - file_id = _runestone_file_id(filename, combined) - b64 = _b64_text_utf8(combined) - sess = _jobe_session() - file_url = settings.jobe_server + "/jobe/index.php/restapi/files/" + file_id runs_url = settings.jobe_server + "/jobe/index.php/restapi/runs/" try: - r = sess.head(file_url, timeout=10) - if r.status_code != 204: - put = sess.put(file_url, json={"file_contents": b64}, timeout=10) - if put.status_code != 204: - return _JobeTestResult(False) - runspec = { "language_id": "python3", "sourcecode": combined, - "sourcefilename": filename, + "sourcefilename": "solution.py", "parameters": {"timelimitsecs": time_limit}, } resp = sess.post(runs_url, json={"run_spec": runspec}, timeout=time_limit + 10) @@ -267,10 +257,11 @@ def load_and_run_tests(unittest_case, code_to_test, time_limit=6): return _JobeTestResult(False) out = (result.get("stdout") or "").strip() - # Outcome 15 = success; outcome 12 = runtime error but some JOBE/Python3 - # sandbox configurations exit non-zero even when tests pass and "PASS" is - # printed. Trust stdout over the outcome code. - passed = out == "PASS" + # Check the last line so student debug prints don't cause false failures. + # Trust stdout over outcome code: JOBE/Python3 may return outcome 12 + # even when tests pass. + last_line = out.splitlines()[-1].strip() if out else "" + passed = last_line == "PASS" return _JobeTestResult(passed) except Exception: