Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/click/_termui_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,7 @@ def _pipepager(
text = strip_ansi(text)

c.stdin.write(text)
c.stdin.flush()
except BrokenPipeError:
# In case the pager exited unexpectedly, ignore the broken pipe error.
pass
Expand Down
69 changes: 69 additions & 0 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import io
import os
import pathlib
import stat
import subprocess
import sys
import threading
import time
from collections import namedtuple
from contextlib import nullcontext
from decimal import Decimal
Expand Down Expand Up @@ -418,6 +421,72 @@ def test_echo_via_pager(monkeypatch, capfd, pager_cmd, test):
)


def test_echo_via_pager_flushes_incremental_output(monkeypatch, tmp_path):
pager_output = tmp_path / "pager-output.txt"

saw_first_line_before_second_yield = False

class FakePagerProcess:
def __init__(self, output_path: Path) -> None:
self._output_path = output_path
read_fd, write_fd = os.pipe()
self.stdin = io.TextIOWrapper(
io.BufferedWriter(os.fdopen(write_fd, "wb", buffering=0)),
encoding="utf-8",
)
self._reader = io.TextIOWrapper(
os.fdopen(read_fd, "rb", buffering=0),
encoding="utf-8",
)
self._thread = threading.Thread(target=self._consume, daemon=True)
self._thread.start()

def _consume(self) -> None:
with self._reader, self._output_path.open("w", encoding="utf-8") as f:
while True:
line = self._reader.readline()
if line == "":
break

f.write(line)
f.flush()

def terminate(self) -> None:
if not self.stdin.closed:
self.stdin.close()

def wait(self) -> int:
self._thread.join(timeout=1)
return 0

def fake_popen(*args, **kwargs):
return FakePagerProcess(pager_output)

def generator():
nonlocal saw_first_line_before_second_yield

yield "first line\n"

for _ in range(20):
if pager_output.exists() and pager_output.read_text() == "first line\n":
saw_first_line_before_second_yield = True
break

time.sleep(0.01)

yield "second line\n"

monkeypatch.setattr(subprocess, "Popen", fake_popen)

assert click._termui_impl._pipepager(
generator(),
["cat"],
color=False,
)
assert saw_first_line_before_second_yield
assert pager_output.read_text() == "first line\nsecond line\n"


def test_echo_color_flag(monkeypatch, capfd):
isatty = True
monkeypatch.setattr(click._compat, "isatty", lambda x: isatty)
Expand Down
Loading