Skip to content
116 changes: 70 additions & 46 deletions app/api/auth.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import base64
import logging
import random
import re
import string
from datetime import timedelta
from functools import wraps
Expand Down Expand Up @@ -51,6 +52,27 @@
auth_routes = Blueprint('auth', __name__, url_prefix='/v1/auth')


def sanitize_for_logging(text):
r"""
Remove control characters from user input before logging to prevent log injection.

Security Issue #9120: User-provided data like emails can contain newlines, carriage
returns, or tabs that allow attackers to inject false log entries, corrupt log files,
bypass log analysis tools, or hide malicious activity.

Example Attack: email="user@test.com\nFAKE: Admin login successful from 1.2.3.4"

Args:
text (str): User-provided input to sanitize

Returns:
str: Text with control characters (\n, \r, \t) removed
"""
if not text:
return text
return re.sub(r'[\n\r\t]', '', text)
Comment on lines +71 to +73
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (code-quality): We've found these issues:

Suggested change
if not text:
return text
return re.sub(r'[\n\r\t]', '', text)
return text if not text else re.sub(r'[\n\r\t]', '', text)



def authenticate(allow_refresh_token=False, existing_identity=None):
data = request.get_json()
username = data.get('email', data.get('username'))
Expand Down Expand Up @@ -81,14 +103,14 @@ def authenticate(allow_refresh_token=False, existing_identity=None):
response_data = {'access_token': access_token}

if add_refresh_token:
refresh_token = create_refresh_token(identity.id)
refresh_token_value = create_refresh_token(identity.id)
if include_in_response:
response_data['refresh_token'] = refresh_token
response_data['refresh_token'] = refresh_token_value

response = jsonify(response_data)

if add_refresh_token and not include_in_response:
set_refresh_cookies(response, refresh_token)
set_refresh_cookies(response, refresh_token_value)

return response

Expand All @@ -108,10 +130,10 @@ def fresh_login():
@auth_routes.route('/token/refresh', methods=['POST'])
@jwt_refresh_token_required
def refresh_token():
current_user = get_jwt_identity()
user_identity = get_jwt_identity()
expiry_time = timedelta(minutes=90)
new_token = create_access_token(
identity=current_user, fresh=False, expires_delta=expiry_time
identity=user_identity, fresh=False, expires_delta=expiry_time
)
return jsonify({'access_token': new_token})

Expand Down Expand Up @@ -302,11 +324,11 @@ def verify_email():
except Exception:
logging.error('Invalid Token')
raise BadRequestError({'source': ''}, 'Invalid Token')
else:
user.is_verified = True
save_to_db(user)
logging.info('Email Verified')
return make_response(jsonify(message="Email Verified"), 200)

user.is_verified = True
save_to_db(user)
logging.info('Email Verified')
return make_response(jsonify(message="Email Verified"), 200)


@auth_routes.route('/resend-verification-email', methods=['POST'])
Expand All @@ -320,22 +342,24 @@ def resend_verification_email():
try:
user = User.query.filter_by(email=email).one()
except NoResultFound:
logging.info('User with email: ' + email + ' not found.')
# Sanitize email to prevent log injection (Issue #9120)
safe_email = sanitize_for_logging(email)
logging.info('User with email: %s not found.', safe_email)
raise UnprocessableEntityError(
{'source': ''}, 'User with email: ' + email + ' not found.'
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] The error message still uses the unsanitized email variable. While this is returned to the user (not logged), it's inconsistent with the security fix applied to the logging statement above. Consider using safe_email here as well for consistency, or document why the unsanitized version is acceptable for user-facing error messages.

Note: This is less critical than the log injection issue since error messages are typically not parsed by log analysis tools, but consistency would improve code clarity.

Suggested change
{'source': ''}, 'User with email: ' + email + ' not found.'
{'source': ''}, 'User with email: ' + safe_email + ' not found.'

Copilot uses AI. Check for mistakes.
)
else:
serializer = get_serializer()
hash_ = str(
base64.b64encode(
str(serializer.dumps([user.email, str_generator()])).encode()
),
'utf-8',
)
link = make_frontend_url('/verify', {'token': hash_})
send_email_confirmation(user.email, link)
logging.info('Verification email resent')
return make_response(jsonify(message="Verification email resent"), 200)

serializer = get_serializer()
hash_ = str(
base64.b64encode(
str(serializer.dumps([user.email, str_generator()])).encode()
),
'utf-8',
)
link = make_frontend_url('/verify', {'token': hash_})
send_email_confirmation(user.email, link)
logging.info('Verification email resent')
return make_response(jsonify(message="Verification email resent"), 200)


@auth_routes.route('/reset-password', methods=['POST'])
Expand Down Expand Up @@ -378,11 +402,11 @@ def reset_password_patch():
except NoResultFound:
logging.info('User Not Found')
raise NotFoundError({'source': ''}, 'User Not Found')
else:
user.password = password
if not user.is_verified:
user.is_verified = True
save_to_db(user)

user.password = password
if not user.is_verified:
user.is_verified = True
save_to_db(user)

return jsonify(
{
Expand All @@ -404,26 +428,26 @@ def change_password():
except NoResultFound:
logging.info('User Not Found')
raise NotFoundError({'source': ''}, 'User Not Found')
else:
if user.is_correct_password(old_password):
if user.is_correct_password(new_password):
logging.error('Old and New passwords must be different')
raise BadRequestError(
{'source': ''}, 'Old and New passwords must be different'
)
if len(new_password) < 8:
logging.error('Password should have minimum 8 characters')
raise BadRequestError(
{'source': ''}, 'Password should have minimum 8 characters'
)
user.password = new_password
save_to_db(user)
send_password_change_email(user)
else:
logging.error('Wrong Password. Please enter correct current password.')

if user.is_correct_password(old_password):
if user.is_correct_password(new_password):
logging.error('Old and New passwords must be different')
raise BadRequestError(
{'source': ''}, 'Old and New passwords must be different'
)
if len(new_password) < 8:
logging.error('Password should have minimum 8 characters')
raise BadRequestError(
{'source': ''}, 'Wrong Password. Please enter correct current password.'
{'source': ''}, 'Password should have minimum 8 characters'
)
user.password = new_password
save_to_db(user)
send_password_change_email(user)
else:
logging.error('Wrong Password. Please enter correct current password.')
raise BadRequestError(
{'source': ''}, 'Wrong Password. Please enter correct current password.'
)

return jsonify(
{
Expand Down
68 changes: 68 additions & 0 deletions tests/all/integration/api/test_auth_log_injection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""
Test for log injection vulnerability in auth.py (Issue #9120)
Tests that user-provided email addresses cannot inject malicious content into logs
Comment on lines +1 to +3
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider aligning the test type with its location (integration vs unit) or expanding it to true integration coverage

This file is under tests/all/integration/api/ but only contains unit-style tests over strings/regexes without touching the API or logging system, which can be confusing. Either move these tests to the usual unit-test location for helpers, or expand them into true integration tests that call /v1/auth/resend-verification-email and verify logs. This keeps the test suite structure consistent and easier to navigate.

Suggested implementation:

"""
Tests for log injection vulnerability in auth.py (Issue #9120).

This module lives under `tests/all/integration/api/` and therefore provides
true integration coverage by exercising the `/v1/auth/resend-verification-email`
endpoint and asserting on what is written to the logging subsystem. It may
also contain focused unit-style assertions over the sanitization helpers to
guard against regressions in the underlying regex logic.
"""
import logging
import re


def test_resend_verification_email_log_sanitization(api_client, caplog):
    """
    Integration test for log injection vulnerability (Issue #9120).

    Sends a request to the resend-verification-email endpoint with a potentially
    malicious email address and verifies that the value written to logs has
    been sanitized (no raw control characters that could be used for log
    injection), while the base email content is still present.
    """
    malicious_email = "attacker@example.com\nERROR: forged entry\n\tat fake_traceback"

    # This should mirror the behavior of `sanitize_for_logging` in auth.py so
    # the expectation here stays aligned with the implementation.
    sanitized_email = re.sub(r"[\r\n\t]", "", malicious_email)

    with caplog.at_level(logging.INFO):
        response = api_client.post(
            "/v1/auth/resend-verification-email",
            json={"email": malicious_email},
        )

    # We don't assert a specific status code here because different environments
    # may have different user/account states; we just require that the endpoint
    # responds successfully at the HTTP layer and does not error out.
    assert 200 <= response.status_code < 500

    log_output = "\n".join(caplog.messages)
    # Raw, unsanitized value must never appear in logs.
    assert malicious_email not in log_output
    # Sanitized value should be what actually gets logged.
    assert sanitized_email in log_output


def test_log_sanitization_for_email():

To wire this up cleanly in your codebase, you may need to:

  1. Ensure the api_client fixture name matches your existing integration-test client fixture (e.g. it might be called client, test_client, or similar). If so, adjust the function signature accordingly:

    • def test_resend_verification_email_log_sanitization(client, caplog):
    • and use client.post(...) inside the test.
  2. If your logging is done under a specific logger name (e.g. "auth" or "myapp.auth"), tighten the caplog context to that logger so the test is less brittle:

    • with caplog.at_level(logging.INFO, logger="myapp.auth"):.
  3. Confirm that the endpoint path and payload match your actual API:

    • If the route differs (e.g. /api/v1/auth/resend-verification-email or uses form-encoded data), update the post call accordingly.
  4. Keep the sanitize_for_logging implementation in auth.py using the same control-character pattern ([\r\n\t]). If the implementation diverges (e.g. more characters are stripped), mirror that behavior in the sanitized_email computation so the expectation remains accurate.

"""
import re


def test_log_sanitization_for_email():
"""
Unit test for log injection vulnerability (Issue #9120)

Tests that email addresses with injection characters are properly sanitized
before being logged to prevent log file corruption.

Security Impact: Without sanitization, attackers can:
- Inject false log entries (e.g., fake admin logins)
- Corrupt log file structure
- Bypass log analysis tools
- Hide malicious activity

Example attack: email="user@test.com\nFAKE: Admin login successful"
"""
# Simulate the vulnerable code pattern from auth.py line 323
malicious_inputs = [
"test@example.com\nFAKE: Admin logged in from 1.2.3.4",
"test@example.com\rFAKE: Password reset",
"test@example.com\t\t\tFAKE_COLUMN",
"test@example.com\n\rMultiline\nInjection\rAttempt",
]

for malicious_email in malicious_inputs:
# This represents the VULNERABLE code pattern:
# logging.info('User with email: ' + email + ' not found.')

# Vulnerability demonstration: raw concatenation allows injection
vulnerable_log_message = 'User with email: ' + malicious_email + ' not found.'

# Check 1: Vulnerable pattern contains control characters (SECURITY ISSUE)
has_injection = any(char in vulnerable_log_message for char in ['\n', '\r', '\t\t\t'])
Copy link

Copilot AI Dec 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for tab characters is incorrect. The code checks for the literal string '\t\t\t' (three tabs) instead of a single tab character '\t'. This means the test will fail for the third test case which only contains three tabs in sequence.

The issue is that '\t\t\t' in vulnerable_log_message checks if the exact sequence of three tabs appears anywhere as a substring, but any(char in vulnerable_log_message for char in ['\n', '\r', '\t\t\t']) treats '\t\t\t' as a single "character" to search for.

Fix by replacing '\t\t\t' with '\t':

has_injection = any(char in vulnerable_log_message for char in ['\n', '\r', '\t'])
Suggested change
has_injection = any(char in vulnerable_log_message for char in ['\n', '\r', '\t\t\t'])
has_injection = any(char in vulnerable_log_message for char in ['\n', '\r', '\t'])

Copilot uses AI. Check for mistakes.
assert has_injection, \
f"Test setup error: Expected injection characters in: {repr(vulnerable_log_message)}"

# Check 2: After sanitization, these characters should be removed/escaped
# This test will PASS after the fix is implemented in auth.py
sanitized_email = re.sub(r'[\n\r\t]', '', malicious_email)
safe_log_message = 'User with email: ' + sanitized_email + ' not found.'

# This assertion documents the expected fix:
# After fix, sanitized logs should not contain injection attempts
assert '\nFAKE:' not in safe_log_message, \
f"Sanitized message should not contain newline injection: {safe_log_message}"
assert '\rFAKE:' not in safe_log_message, \
f"Sanitized message should not contain CR injection: {safe_log_message}"


def test_normal_email_unchanged_after_sanitization():
"""Test that normal emails remain unchanged after sanitization"""
normal_emails = [
"user@example.com",
"test.user+tag@domain.co.uk",
"admin@localhost",
]

for email in normal_emails:
# Sanitization should not affect legitimate emails
sanitized = re.sub(r'[\n\r\t]', '', email)
assert sanitized == email, \
f"Normal email should remain unchanged: {email} -> {sanitized}"
Comment on lines +64 to +68
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (code-quality): Avoid loops in tests. (no-loop-in-tests)

ExplanationAvoid complex code, like loops, in test functions.

Google's software engineering guidelines says:
"Clear tests are trivially correct upon inspection"
To reach that avoid complex code in tests:

  • loops
  • conditionals

Some ways to fix this:

  • Use parametrized tests to get rid of the loop.
  • Move the complex logic into helpers.
  • Move the complex part into pytest fixtures.

Complexity is most often introduced in the form of logic. Logic is defined via the imperative parts of programming languages such as operators, loops, and conditionals. When a piece of code contains logic, you need to do a bit of mental computation to determine its result instead of just reading it off of the screen. It doesn't take much logic to make a test more difficult to reason about.

Software Engineering at Google / Don't Put Logic in Tests