Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ The following people have contributed to the development of Rich:
- [Paul McGuire](https://github.com/ptmcg)
- [Antony Milne](https://github.com/AntonyMilneQB)
- [Michael Milton](https://github.com/multimeric)
- [Theodore Ni](https://github.com/tjni)
- [Martina Oefelein](https://github.com/oefe)
- [Joel Ostblom](https://github.com/joelostblom)
- [Nathan Page](https://github.com/nathanrpage97)
Expand Down
33 changes: 23 additions & 10 deletions tests/test_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -1006,17 +1006,30 @@ def test_reenable_highlighting() -> None:
@pytest.mark.skipif(sys.platform == "win32", reason="does not run on windows")
def test_brokenpipeerror() -> None:
"""Test BrokenPipe works as expected."""
which_py, which_head = (["which", cmd] for cmd in ("python", "head"))
rich_cmd = "python -m rich".split()
for cmd in [which_py, which_head, rich_cmd]:
check = subprocess.run(cmd).returncode
if check != 0:
return # Only test on suitable Unix platforms
head_cmd = "head -1".split()
proc1 = subprocess.Popen(rich_cmd, stdout=subprocess.PIPE)
proc2 = subprocess.Popen(head_cmd, stdin=proc1.stdout, stdout=subprocess.PIPE)

# We write enough output to hopefully keep the writing process busy and give
# enough time for the reading process to exit, creating a BrokenPipeError.
writer = (
"from rich.console import Console\n"
"console = Console(force_terminal=True, width=240)\n"
"payload = 'X' * 200\n"
"for index in range(10000):\n"
" console.print(f'{index:06d} {payload}')\n"
)
reader = "import sys; sys.stdin.buffer.read(1)"
proc1 = subprocess.Popen(
[sys.executable, "-c", writer],
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
proc2 = subprocess.Popen(
[sys.executable, "-c", reader],
stdin=proc1.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
proc1.stdout.close()
output, _ = proc2.communicate()
proc2.communicate()
proc1.wait()
proc2.wait()
assert proc1.returncode == 1
Expand Down