From fee483d691a56616cdca46d91240358e02277f84 Mon Sep 17 00:00:00 2001 From: Andreas Backx Date: Mon, 4 May 2026 21:05:41 +0200 Subject: [PATCH 1/2] Add some unit tests and integration tests for get_pager_file. --- tests/test_termui.py | 160 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) diff --git a/tests/test_termui.py b/tests/test_termui.py index 7aa260084..dd127effa 100644 --- a/tests/test_termui.py +++ b/tests/test_termui.py @@ -1,11 +1,17 @@ +import contextlib +import io import platform import shlex +import shutil +import subprocess import tempfile import time +from functools import partial from unittest.mock import patch import pytest +import click import click._termui_impl from click._compat import WIN from click._termui_impl import Editor @@ -620,6 +626,160 @@ def test_pager_shlex_split(pager_env, expected_parts): assert shlex.split(pager_env) == expected_parts +def test_get_pager_file_reuses_existing_maybe_strip_ansi(monkeypatch): + """An already-wrapped pager stream should be yielded unchanged.""" + buffer = io.BytesIO() + wrapped = click._termui_impl.MaybeStripAnsi(buffer, color=True, encoding="utf-8") + + @contextlib.contextmanager + def pager_contextmanager(color=None): + yield wrapped, "utf-8", color + + monkeypatch.setattr( + click._termui_impl, "_pager_contextmanager", pager_contextmanager + ) + + with click.get_pager_file(color=True) as pager: + assert pager is wrapped + pager.write("hello") + + assert buffer.getvalue() == b"hello" + + +def test_get_pager_file_keeps_text_stream_without_buffer(monkeypatch): + """ + A text stream with no ``.buffer`` should not be re-wrapped and have + its color field set. + """ + + class TextStreamWithoutBuffer: + encoding = "utf-8" + + def __init__(self): + self.value = "" + self.color = None + + def write(self, text): + self.value += text + return len(text) + + def flush(self): + pass + + stream = TextStreamWithoutBuffer() + styled_text = click.style("hello", fg="red") + + @contextlib.contextmanager + def pager_contextmanager(color=None): + yield stream, "utf-8", color + + monkeypatch.setattr( + click._termui_impl, "_pager_contextmanager", pager_contextmanager + ) + + with click.get_pager_file(color=False) as pager: + assert pager is stream + assert pager.color is False + pager.write(styled_text) + + assert stream.value == styled_text + + +def _run_get_pager_file_with_real_pager(monkeypatch, tmp_path, writer, color=False): + pager_path = shutil.which("cat") + assert pager_path is not None, "cat not available" + + pager_out = tmp_path / "pager_out.txt" + + monkeypatch.setattr(click._termui_impl, "isatty", lambda _: True) + monkeypatch.setitem(click._termui_impl.os.environ, "PAGER", pager_path) + + with pager_out.open("w") as f: + force_subprocess_stdout = patch.object( + subprocess, + "Popen", + partial(subprocess.Popen, stdout=f), + ) + with force_subprocess_stdout: + with click.get_pager_file(color=color) as pager: + writer(pager) + + return pager_out.read_text() + + +@pytest.mark.skipif(WIN, reason="Exercises the pipe pager path.") +def test_get_pager_file_supports_multiple_write_sites(monkeypatch, tmp_path): + """Different helpers can write to the same pager stream.""" + + def writer(pager): + pager.write("prefix\n") + click.echo("middle", file=pager) + pager.write("suffix\n") + + output = _run_get_pager_file_with_real_pager(monkeypatch, tmp_path, writer) + + assert output == "prefix\nmiddle\nsuffix\n" + + +@pytest.mark.skipif(WIN, reason="Exercises the pipe pager path.") +@pytest.mark.parametrize( + ("text", "color", "expected"), + [ + pytest.param("hello\n", False, "hello\n", id="plain text"), + pytest.param( + click.style("hello", fg="red") + "\n", False, "hello\n", id="strip ansi" + ), + pytest.param( + click.style("hello", fg="red") + "\n", + True, + click.style("hello", fg="red") + "\n", + id="preserve ansi", + ), + pytest.param("", False, "", id="empty string"), + ], +) +def test_get_pager_file_with_real_pager_data_cases( + monkeypatch, tmp_path, text, color, expected +): + """Real pager output should match text and color expectations.""" + output = _run_get_pager_file_with_real_pager( + monkeypatch, tmp_path, lambda pager: pager.write(text), color=color + ) + + assert output == expected + + +def test_get_pager_file_flushes_stream_on_exception(monkeypatch): + """Exceptions should still flush the yielded stream in ``finally``.""" + + class FlushableTextStream: + encoding = "utf-8" + + def __init__(self): + self.color = None + self.flush_calls = 0 + + def flush(self): + self.flush_calls += 1 + + stream = FlushableTextStream() + + @contextlib.contextmanager + def pager_contextmanager(color=None): + yield stream, "utf-8", color + + monkeypatch.setattr( + click._termui_impl, "_pager_contextmanager", pager_contextmanager + ) + + with pytest.raises(RuntimeError, match="boom"): + with click.get_pager_file() as pager: + assert pager is stream + raise RuntimeError("boom") + + assert stream.flush_calls == 1 + + def test_editor_unclosed_quote(): """An unclosed quote in the editor command raises ValueError.""" with pytest.raises(ValueError, match="No closing quotation"): From c9954405e47e52bf02c8ac98c37deda3a4381471 Mon Sep 17 00:00:00 2001 From: Andreas Backx Date: Tue, 5 May 2026 19:18:44 +0100 Subject: [PATCH 2/2] Redesigned tests and get_pager_file branching to be more clear and not set color unknowingly. --- src/click/_termui_impl.py | 34 ++++--- tests/test_termui.py | 195 ++++++++++++++++++-------------------- 2 files changed, 117 insertions(+), 112 deletions(-) diff --git a/src/click/_termui_impl.py b/src/click/_termui_impl.py index 286271c8a..d3ef3cf84 100644 --- a/src/click/_termui_impl.py +++ b/src/click/_termui_impl.py @@ -32,6 +32,19 @@ V = t.TypeVar("V") + +class _BufferedTextPagerStream(t.Protocol): + buffer: t.BinaryIO + + +def _has_binary_buffer( + stream: t.BinaryIO | t.TextIO, +) -> t.TypeGuard[_BufferedTextPagerStream]: + # TextIO is wider than TextIOWrapper; text-only streams such as StringIO + # are valid TextIO values but do not expose a binary buffer to wrap. + return getattr(stream, "buffer", None) is not None + + if os.name == "nt": BEFORE_BAR = "\r" AFTER_BAR = "\n" @@ -417,18 +430,17 @@ def get_pager_file(color: bool | None = None) -> t.Generator[t.TextIO, None, Non default is autodetection. """ with _pager_contextmanager(color=color) as (stream, encoding, color): - if not isinstance(stream, MaybeStripAnsi): - if hasattr(stream, "buffer"): - # Real TextIO with buffer - unwrap and wrap in MaybeStripAnsi - stream = MaybeStripAnsi(stream.buffer, color=color, encoding=encoding) - elif not getattr(stream, "encoding", None): - # BinaryIO - wrap directly in MaybeStripAnsi - stream = MaybeStripAnsi(stream, color=color, encoding=encoding) - else: - # StringIO - add .color attribute only, no ANSI stripping - stream.color = color # type: ignore[attr-defined] + # Split streams by capabilities rather than the abstract TextIO / + # BinaryIO annotations: buffered text streams can be unwrapped to bytes, + # while text-only streams are yielded as-is. + if _has_binary_buffer(stream): + # Text stream backed by a binary buffer. + stream = MaybeStripAnsi(stream.buffer, color=color, encoding=encoding) + elif isinstance(stream, t.BinaryIO): + # Binary stream + stream = MaybeStripAnsi(stream, color=color, encoding=encoding) try: - yield t.cast(t.TextIO, stream) + yield stream finally: stream.flush() diff --git a/tests/test_termui.py b/tests/test_termui.py index dd127effa..58870533d 100644 --- a/tests/test_termui.py +++ b/tests/test_termui.py @@ -3,10 +3,9 @@ import platform import shlex import shutil -import subprocess +import sys import tempfile import time -from functools import partial from unittest.mock import patch import pytest @@ -626,136 +625,130 @@ def test_pager_shlex_split(pager_env, expected_parts): assert shlex.split(pager_env) == expected_parts -def test_get_pager_file_reuses_existing_maybe_strip_ansi(monkeypatch): - """An already-wrapped pager stream should be yielded unchanged.""" - buffer = io.BytesIO() - wrapped = click._termui_impl.MaybeStripAnsi(buffer, color=True, encoding="utf-8") +def _get_real_pager_command() -> str: + """Return a platform pager used to exercise the BinaryIO pager branch.""" + pager_name = "more" if WIN else "cat" + pager_path = shutil.which(pager_name) + assert pager_path is not None, f"{pager_name} not available" + return pager_path - @contextlib.contextmanager - def pager_contextmanager(color=None): - yield wrapped, "utf-8", color - - monkeypatch.setattr( - click._termui_impl, "_pager_contextmanager", pager_contextmanager - ) - - with click.get_pager_file(color=True) as pager: - assert pager is wrapped - pager.write("hello") - - assert buffer.getvalue() == b"hello" - - -def test_get_pager_file_keeps_text_stream_without_buffer(monkeypatch): - """ - A text stream with no ``.buffer`` should not be re-wrapped and have - its color field set. - """ - - class TextStreamWithoutBuffer: - encoding = "utf-8" - - def __init__(self): - self.value = "" - self.color = None - - def write(self, text): - self.value += text - return len(text) - - def flush(self): - pass - - stream = TextStreamWithoutBuffer() - styled_text = click.style("hello", fg="red") - - @contextlib.contextmanager - def pager_contextmanager(color=None): - yield stream, "utf-8", color - - monkeypatch.setattr( - click._termui_impl, "_pager_contextmanager", pager_contextmanager - ) - - with click.get_pager_file(color=False) as pager: - assert pager is stream - assert pager.color is False - pager.write(styled_text) - - assert stream.value == styled_text - - -def _run_get_pager_file_with_real_pager(monkeypatch, tmp_path, writer, color=False): - pager_path = shutil.which("cat") - assert pager_path is not None, "cat not available" - - pager_out = tmp_path / "pager_out.txt" +def _run_get_pager_file_with_real_pager(monkeypatch, capfd, writer, color=False): + """Run through the platform pager backend selected by ``PAGER``.""" monkeypatch.setattr(click._termui_impl, "isatty", lambda _: True) - monkeypatch.setitem(click._termui_impl.os.environ, "PAGER", pager_path) - - with pager_out.open("w") as f: - force_subprocess_stdout = patch.object( - subprocess, - "Popen", - partial(subprocess.Popen, stdout=f), - ) - with force_subprocess_stdout: - with click.get_pager_file(color=color) as pager: - writer(pager) - - return pager_out.read_text() - + monkeypatch.setitem( + click._termui_impl.os.environ, "PAGER", _get_real_pager_command() + ) -@pytest.mark.skipif(WIN, reason="Exercises the pipe pager path.") -def test_get_pager_file_supports_multiple_write_sites(monkeypatch, tmp_path): - """Different helpers can write to the same pager stream.""" + with click.get_pager_file(color=color) as pager: + writer(pager) - def writer(pager): - pager.write("prefix\n") - click.echo("middle", file=pager) - pager.write("suffix\n") + # The real pager writes to the process stdout; stderr should stay quiet. + out, err = capfd.readouterr() + assert err == "" + return out - output = _run_get_pager_file_with_real_pager(monkeypatch, tmp_path, writer) - assert output == "prefix\nmiddle\nsuffix\n" +def _write_pager_from_multiple_sites(pager): + pager.write("prefix\n") + click.echo("middle", file=pager) + pager.write("suffix\n") -@pytest.mark.skipif(WIN, reason="Exercises the pipe pager path.") @pytest.mark.parametrize( - ("text", "color", "expected"), + ("writer", "color", "expected"), [ - pytest.param("hello\n", False, "hello\n", id="plain text"), pytest.param( - click.style("hello", fg="red") + "\n", False, "hello\n", id="strip ansi" + _write_pager_from_multiple_sites, + False, + "prefix\nmiddle\nsuffix\n", + id="multiple write sites", ), pytest.param( - click.style("hello", fg="red") + "\n", + lambda pager: pager.write("hello\n"), False, "hello\n", id="plain text" + ), + pytest.param( + lambda pager: pager.write(click.style("hello", fg="red") + "\n"), + False, + "hello\n", + id="strip ansi", + ), + pytest.param( + lambda pager: pager.write(click.style("hello", fg="red") + "\n"), True, click.style("hello", fg="red") + "\n", id="preserve ansi", ), - pytest.param("", False, "", id="empty string"), + pytest.param(lambda pager: pager.write(""), False, "", id="empty string"), ], ) -def test_get_pager_file_with_real_pager_data_cases( - monkeypatch, tmp_path, text, color, expected +def test_get_pager_file_with_real_pager_binary_stream( + monkeypatch, capfd, writer, color, expected ): - """Real pager output should match text and color expectations.""" + """A real pager should exercise the BinaryIO branch on Unix and Windows.""" output = _run_get_pager_file_with_real_pager( - monkeypatch, tmp_path, lambda pager: pager.write(text), color=color + monkeypatch, capfd, writer, color=color ) assert output == expected +@pytest.mark.parametrize( + ("color", "expected"), + [ + pytest.param(False, "hello\n", id="strip ansi"), + pytest.param(True, click.style("hello", fg="red") + "\n", id="preserve ansi"), + ], +) +def test_get_pager_file_nullpager_wraps_textio_stream( + monkeypatch, tmp_path, color, expected +): + """When paging falls back to a real TextIO stream, ``.buffer`` is wrapped.""" + pager_out = tmp_path / "pager_out.txt" + + with pager_out.open("w", encoding="utf-8") as text_stream: + monkeypatch.setattr( + click._termui_impl, "_default_text_stdout", lambda: text_stream + ) + monkeypatch.setattr( + click._termui_impl, "isatty", lambda stream: stream is not sys.stdin + ) + + with click.get_pager_file(color=color) as pager: + pager.write(click.style("hello", fg="red") + "\n") + + assert pager_out.read_text(encoding="utf-8") == expected + + +def test_get_pager_file_nullpager_keeps_stringio_stream(monkeypatch): + """The no-stdout fallback should keep a text-only stream and set ``.color``.""" + + created = [] + + def make_stringio(): + stream = io.StringIO() + created.append(stream) + return stream + + monkeypatch.setattr(sys, "stdout", None) + monkeypatch.setattr(click._termui_impl, "StringIO", make_stringio) + monkeypatch.setattr(click._termui_impl, "isatty", lambda _: False) + + styled_text = click.style("hello", fg="red") + + with click.get_pager_file(color=False) as pager: + assert pager is created[0] + pager.write(styled_text) + + assert created[0].getvalue() == styled_text + + def test_get_pager_file_flushes_stream_on_exception(monkeypatch): """Exceptions should still flush the yielded stream in ``finally``.""" - class FlushableTextStream: - encoding = "utf-8" - + class FlushableTextStream(io.StringIO): def __init__(self): + super().__init__() self.color = None self.flush_calls = 0