Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1017,9 +1017,24 @@ ez-reset has definitions for (and thus *theoretically* supports) the following p

## Requirements

- Windows
- Windows or macOS
- Supported Epson printer connected via USB

### macOS support

The app can run on macOS using USB Printer Class via `pyusb`, but you must install `libusb` first:

```bash
brew install libusb
```

Then install and run as usual:

```bash
pip install .
python -m ez_reset
```

## Installation (easy)

Grab a prebuilt binary from the *Releases* tab or [download directly from here](https://github.com/CiRIP/ez-reset/releases/latest/download/ez-reset.exe).
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ description = "Easy reset tool for Windows over USB for printers with a certain
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"pywin32>=311",
"pywin32>=311; platform_system == 'Windows'",
"pyusb>=1.3.1; platform_system != 'Windows'",
]

[tool.ruff]
Expand Down
32 changes: 30 additions & 2 deletions src/ez_reset/__main__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import platform
import tkinter as tk
import traceback
from collections.abc import Iterable
Expand All @@ -9,7 +10,24 @@
from ez_reset.printer import Printer
from ez_reset.status import InkColor, InkLevel
from ez_reset.utils import parse_identifier
from ez_reset.win_usbprint import USBPRINTTransport, enumerate_printers

if platform.system() == "Windows":
from ez_reset.win_usbprint import USBPRINTTransport, enumerate_printers

USB_BACKEND_AVAILABLE = True
USB_BACKEND_MESSAGE = ""
else:
try:
from ez_reset.usbprint import USBPRINTTransport, enumerate_printers
except ImportError:
USB_BACKEND_AVAILABLE = False
USB_BACKEND_MESSAGE = "Install non-Windows USB dependencies (pyusb + libusb) to use this app."

def enumerate_printers() -> Iterable[str]:
return ()
else:
USB_BACKEND_AVAILABLE = True
USB_BACKEND_MESSAGE = ""


class PrinterList(ttk.Frame):
Expand All @@ -22,6 +40,11 @@ def __init__(self, master: tk.Misc) -> None:
self.list.pack(fill=tk.BOTH, expand=True)
self.list.bind("<Double-Button-1>", self.open_printer)

if not USB_BACKEND_AVAILABLE:
self.list.insert(0, USB_BACKEND_MESSAGE)
self.list.configure(state=tk.DISABLED)
return

self.update_printers()

def update_printers(self) -> None:
Expand All @@ -33,6 +56,10 @@ def update_printers(self) -> None:
self.list.insert(idx, device)

def open_printer(self, _event: tk.Event) -> None:
if not USB_BACKEND_AVAILABLE:
messagebox.showinfo("Unsupported platform", USB_BACKEND_MESSAGE)
return

selected_index = self.list.curselection()[0]

usb_transport = USBPRINTTransport(self._printers[selected_index]).__enter__()
Expand Down Expand Up @@ -197,7 +224,8 @@ def update_level(self, level: int) -> None:
root = tk.Tk()
root.title("ez-reset")
style = ttk.Style(root)
style.theme_use("winnative")
if platform.system() == "Windows":
style.theme_use("winnative")


def show_error(self, *args) -> None: # noqa: ANN001,ANN002,ARG001
Expand Down
2 changes: 2 additions & 0 deletions src/ez_reset/usbprint/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .transport import USBPRINTTransport as USBPRINTTransport
from .transport import enumerate_printers as enumerate_printers
191 changes: 191 additions & 0 deletions src/ez_reset/usbprint/transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
import logging
import time
from types import TracebackType
from typing import Self

import usb.core
import usb.util

from ez_reset.transport import Transport

logger = logging.getLogger(__name__)

MAX_TRANSFER_SIZE = 0x400000
READ_TIMEOUT_MS = 150
WRITE_TIMEOUT_MS = 5000

PRINTER_CLASS = 0x07
GET_DEVICE_ID = 0x00
SOFT_RESET = 0x02


def _is_printer_device(device: usb.core.Device) -> bool:
for config in device:
for interface in config:
if interface.bInterfaceClass == PRINTER_CLASS:
return True

return False


def _find_printer_interface(device: usb.core.Device) -> tuple[int, int]:
for config in device:
for interface in config:
if interface.bInterfaceClass == PRINTER_CLASS:
return config.bConfigurationValue, interface.bInterfaceNumber

msg = f"USB device {device.idVendor:04x}:{device.idProduct:04x} has no printer interface"
raise RuntimeError(msg)


def _format_path(device: usb.core.Device) -> str:
return f"{device.idVendor:04x}:{device.idProduct:04x}:{device.bus:03d}:{device.address:03d}"


def enumerate_printers() -> list[str]:
printers: list[str] = []
devices = usb.core.find(find_all=True, custom_match=_is_printer_device)

for device in devices:
path = _format_path(device)
product = usb.util.get_string(device, device.iProduct) or "Unknown printer"
printers.append(f"{path} {product}")

return printers


class USBPRINTTransport(Transport):
def __init__(self, path: str) -> None:
self.path = path
self.closed = True

self._buffer = b""
self._device: usb.core.Device | None = None
self._interface: int | None = None
self._ep_in: usb.core.Endpoint | None = None
self._ep_out: usb.core.Endpoint | None = None
self._detached_kernel = False

def __enter__(self) -> Self:
try:
vid, pid, bus, address = (int(v, 16) for v in self.path.split(" ", 1)[0].split(":"))
except (ValueError, IndexError) as exc:
msg = f"Invalid USB printer path: {self.path}"
raise ValueError(msg) from exc

device = usb.core.find(idVendor=vid, idProduct=pid, bus=bus, address=address)
if not device:
msg = f"USB printer not found: {self.path}"
raise OSError(msg)

cfg_value, interface = _find_printer_interface(device)
device.set_configuration(cfg_value)

if device.is_kernel_driver_active(interface):
device.detach_kernel_driver(interface)
self._detached_kernel = True

usb.util.claim_interface(device, interface)
configuration = device.get_active_configuration()
intf = configuration[(interface, 0)]

ep_out = usb.util.find_descriptor(
intf,
custom_match=lambda endpoint: usb.util.endpoint_direction(endpoint.bEndpointAddress)
== usb.util.ENDPOINT_OUT
and usb.util.endpoint_type(endpoint.bmAttributes) == usb.util.ENDPOINT_TYPE_BULK,
)
ep_in = usb.util.find_descriptor(
intf,
custom_match=lambda endpoint: usb.util.endpoint_direction(endpoint.bEndpointAddress)
== usb.util.ENDPOINT_IN
and usb.util.endpoint_type(endpoint.bmAttributes) == usb.util.ENDPOINT_TYPE_BULK,
)

if not ep_out or not ep_in:
msg = f"USB printer interface is missing bulk endpoints: {self.path}"
raise OSError(msg)

# Clear stale state before opening a D4 session.
device.ctrl_transfer(0x21, SOFT_RESET, 0, interface)

self._device = device
self._interface = interface
self._ep_in = ep_in
self._ep_out = ep_out
self.closed = False

return self

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
) -> bool:
if self._device and self._interface is not None:
usb.util.release_interface(self._device, self._interface)
if self._detached_kernel:
self._device.attach_kernel_driver(self._interface)

usb.util.dispose_resources(self._device)

self._device = None
self._interface = None
self._ep_in = None
self._ep_out = None
self._detached_kernel = False
self.closed = True

return False

def write(self, data: bytes) -> None:
if self.closed or not self._ep_out:
msg = f"Handle to USBPRINT device {self.path} is closed"
raise OSError(msg)

bytes_written = self._ep_out.write(data, timeout=WRITE_TIMEOUT_MS)
if bytes_written != len(data):
msg = f"Short USB write: expected {len(data)}, got {bytes_written}"
raise OSError(msg)

def read(self, size: int) -> bytes:
if self.closed or not self._ep_in:
msg = f"Handle to USBPRINT device {self.path} is closed"
raise OSError(msg)

while len(self._buffer) < size:
try:
data = bytes(self._ep_in.read(MAX_TRANSFER_SIZE, timeout=READ_TIMEOUT_MS))
self._buffer += data
except usb.core.USBTimeoutError:
time.sleep(0.01)

read = self._buffer[:size]
self._buffer = self._buffer[size:]
return read

def drain(self) -> None:
if self.closed or not self._ep_in:
return

while True:
try:
data = self._ep_in.read(MAX_TRANSFER_SIZE, timeout=READ_TIMEOUT_MS)
if len(data) == 0:
return
except usb.core.USBTimeoutError:
return

def identify(self) -> str:
if self.closed or not self._device or self._interface is None:
msg = f"Handle to USBPRINT device {self.path} is closed"
raise OSError(msg)

raw = bytes(self._device.ctrl_transfer(0xA1, GET_DEVICE_ID, 0, self._interface, 1024))
if len(raw) < 3:
return raw.decode("ascii", errors="ignore")

payload_size = int.from_bytes(raw[:2], byteorder="big")
payload = raw[2:payload_size]
return payload.decode("ascii", errors="ignore").strip("\x00")