-
Notifications
You must be signed in to change notification settings - Fork 7.5k
fix(dashboard): cap job log reads to prevent dashboard agent OOM #61537
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -5,6 +5,14 @@ | |
| from ray.dashboard.modules.job.common import JOB_LOGS_PATH_TEMPLATE | ||
| from ray.dashboard.modules.job.utils import fast_tail_last_n_lines, file_tail_iterator | ||
|
|
||
| # Maximum bytes to read from a job log file via get_logs(). | ||
| # Prevents the dashboard agent from OOMing when a single job produces | ||
| # tens of gigabytes of stdout/stderr. Override with the environment | ||
| # variable RAY_JOB_LOG_MAX_READ_BYTES (integer, in bytes). | ||
| JOB_LOG_MAX_READ_BYTES = int( | ||
| os.environ.get("RAY_JOB_LOG_MAX_READ_BYTES", 16 * 1024 * 1024) | ||
| ) | ||
|
|
||
|
|
||
| class JobLogStorageClient: | ||
| """ | ||
|
|
@@ -19,8 +27,30 @@ class JobLogStorageClient: | |
|
|
||
| def get_logs(self, job_id: str) -> str: | ||
| try: | ||
| with open(self.get_log_file_path(job_id), "r") as f: | ||
| return f.read() | ||
| log_path = self.get_log_file_path(job_id) | ||
| file_size = os.path.getsize(log_path) | ||
|
|
||
| if file_size <= JOB_LOG_MAX_READ_BYTES: | ||
| with open(log_path, "r", encoding="utf-8", errors="replace") as f: | ||
| return f.read() | ||
|
|
||
| # File exceeds the cap — read the tail in binary mode to avoid | ||
| # UnicodeDecodeError when the byte offset lands mid-character. | ||
| with open(log_path, "rb") as f: | ||
| f.seek(file_size - JOB_LOG_MAX_READ_BYTES) | ||
| f.readline() # skip partial first line | ||
| tail = f.read().decode("utf-8", errors="replace") | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unbounded
|
||
|
|
||
| total_human = ( | ||
| f"{file_size / (1024**3):.1f} GiB" | ||
| if file_size >= 1024**3 | ||
| else f"{file_size / (1024**2):.1f} MiB" | ||
| ) | ||
| cap_human = f"{JOB_LOG_MAX_READ_BYTES / (1024**2):.0f} MiB" | ||
| return ( | ||
| f"[LOG TRUNCATED — showing last {cap_human} " | ||
| f"of {total_human} total]\n{tail}" | ||
| ) | ||
| except FileNotFoundError: | ||
| return "" | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| import os | ||
| from unittest.mock import patch | ||
|
|
||
| import pytest | ||
|
|
||
| from ray.dashboard.modules.job.job_log_storage_client import ( | ||
| JOB_LOG_MAX_READ_BYTES, | ||
| JobLogStorageClient, | ||
| ) | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def client(): | ||
| return JobLogStorageClient() | ||
|
|
||
|
|
||
| @pytest.fixture | ||
| def tmp_log_dir(tmp_path): | ||
| """Create a temp dir and patch get_log_file_path to use it.""" | ||
|
|
||
| def _write_log(job_id: str, content, encoding="utf-8") -> str: | ||
| path = tmp_path / f"job-driver-{job_id}.log" | ||
| if isinstance(content, bytes): | ||
| path.write_bytes(content) | ||
| else: | ||
| path.write_text(content, encoding=encoding) | ||
| return str(path) | ||
|
|
||
| return _write_log | ||
|
|
||
|
|
||
| class TestGetLogs: | ||
| def test_small_file_returned_fully(self, client, tmp_log_dir): | ||
| content = "line 1\nline 2\nline 3\n" | ||
| log_path = tmp_log_dir("small-job", content) | ||
|
|
||
| with patch.object(client, "get_log_file_path", return_value=log_path): | ||
| result = client.get_logs("small-job") | ||
|
|
||
| assert result == content | ||
|
|
||
| def test_missing_file_returns_empty_string(self, client): | ||
| with patch.object( | ||
| client, "get_log_file_path", return_value="/nonexistent/path.log" | ||
| ): | ||
| result = client.get_logs("missing-job") | ||
|
|
||
| assert result == "" | ||
|
|
||
| def test_large_file_truncated_with_notice(self, client, tmp_log_dir): | ||
| # Create a file larger than the cap | ||
| line = "x" * 999 + "\n" # 1000 bytes per line | ||
| num_lines = (JOB_LOG_MAX_READ_BYTES // 1000) + 100 | ||
| content = line * num_lines | ||
| log_path = tmp_log_dir("large-job", content) | ||
|
|
||
| with patch.object(client, "get_log_file_path", return_value=log_path): | ||
| result = client.get_logs("large-job") | ||
|
|
||
| assert result.startswith("[LOG TRUNCATED") | ||
| assert "MiB" in result | ||
| # The result should be significantly smaller than the original | ||
| assert len(result) <= JOB_LOG_MAX_READ_BYTES + 1024 # allow header overhead | ||
| # Should not contain the very first line (it was truncated away) | ||
| assert result.endswith("\n") | ||
|
|
||
| def test_large_file_starts_on_clean_line_boundary(self, client, tmp_log_dir): | ||
| # Use variable-length lines so we can verify partial line is skipped | ||
| lines = [f"LINE-{i:06d}-{'y' * 80}\n" for i in range(200_000)] | ||
| content = "".join(lines) | ||
| log_path = tmp_log_dir("boundary-job", content) | ||
|
|
||
| with patch.object(client, "get_log_file_path", return_value=log_path): | ||
| result = client.get_logs("boundary-job") | ||
|
|
||
| # After the truncation header, each line should be complete | ||
| body_lines = result.split("\n") | ||
| # First line is the truncation notice | ||
| assert body_lines[0].startswith("[LOG TRUNCATED") | ||
| # Remaining non-empty lines should all start with "LINE-" | ||
| for ln in body_lines[1:]: | ||
| if ln: | ||
| assert ln.startswith("LINE-"), f"Partial line found: {ln[:50]}" | ||
|
|
||
| def test_multibyte_utf8_does_not_raise(self, client, tmp_log_dir): | ||
| """Seek landing mid-UTF-8 character must not raise UnicodeDecodeError.""" | ||
| from ray.dashboard.modules.job import job_log_storage_client as mod | ||
|
|
||
| original = mod.JOB_LOG_MAX_READ_BYTES | ||
| # Use a small cap so the seek is likely to land mid-character. | ||
| mod.JOB_LOG_MAX_READ_BYTES = 512 | ||
| try: | ||
| # Build content with multi-byte chars (emoji = 4 bytes each). | ||
| # Enough to exceed 512 bytes. | ||
| line = "日志输出:🚀🎉🔥 data\n" # mix of 3-byte and 4-byte chars | ||
| content = (line * 200).encode("utf-8") | ||
| assert len(content) > 512 | ||
| log_path = tmp_log_dir("utf8-job", content) | ||
|
|
||
| with patch.object(client, "get_log_file_path", return_value=log_path): | ||
| result = client.get_logs("utf8-job") | ||
|
|
||
| assert result.startswith("[LOG TRUNCATED") | ||
| # Should contain recognizable content (replacement chars are OK) | ||
| assert "data" in result | ||
| finally: | ||
| mod.JOB_LOG_MAX_READ_BYTES = original | ||
|
|
||
| @patch.dict(os.environ, {"RAY_JOB_LOG_MAX_READ_BYTES": "4096"}) | ||
| def test_cap_is_configurable_via_env(self, client, tmp_log_dir): | ||
| # Re-import to pick up the env var | ||
| from ray.dashboard.modules.job import job_log_storage_client as mod | ||
|
|
||
| original = mod.JOB_LOG_MAX_READ_BYTES | ||
| # Simulate the env-based init | ||
| mod.JOB_LOG_MAX_READ_BYTES = int( | ||
| os.environ.get("RAY_JOB_LOG_MAX_READ_BYTES", 16 * 1024 * 1024) | ||
| ) | ||
| try: | ||
| assert mod.JOB_LOG_MAX_READ_BYTES == 4096 | ||
|
|
||
| content = "a" * 999 + "\n" # 1000 bytes per line | ||
| content = content * 10 # 10,000 bytes total > 4096 | ||
| log_path = tmp_log_dir("env-job", content) | ||
|
|
||
| with patch.object(client, "get_log_file_path", return_value=log_path): | ||
| result = client.get_logs("env-job") | ||
|
|
||
| assert result.startswith("[LOG TRUNCATED") | ||
| finally: | ||
| mod.JOB_LOG_MAX_READ_BYTES = original | ||
|
Comment on lines
+110
to
+131
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. While this test correctly verifies the functionality and cleans up after itself, it duplicates the initialization logic from A more robust approach is to use def test_cap_is_configurable_via_env(self, client, tmp_log_dir):
import importlib
from ray.dashboard.modules.job import job_log_storage_client as mod
original = mod.JOB_LOG_MAX_READ_BYTES
try:
# Reload the module to apply the environment variable patch.
importlib.reload(mod)
assert mod.JOB_LOG_MAX_READ_BYTES == 4096
content = "a" * 999 + "\n" # 1000 bytes per line
content = content * 10 # 10,000 bytes total > 4096
log_path = tmp_log_dir("env-job", content)
with patch.object(client, "get_log_file_path", return_value=log_path):
result = client.get_logs("env-job")
assert result.startswith("[LOG TRUNCATED")
finally:
# Restore the original value to ensure test isolation.
mod.JOB_LOG_MAX_READ_BYTES = original |
||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| import sys | ||
|
|
||
| sys.exit(pytest.main([__file__, "-v"])) | ||


There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cap should be applied to the receiver side instead of sender side.
Why not have the pagination after receiving the logs