From d0a3e474c7890376aed01a7518ebbca10fe68e5d Mon Sep 17 00:00:00 2001 From: MattHag <16444067+MattHag@users.noreply.github.com> Date: Thu, 16 May 2024 21:58:22 +0200 Subject: [PATCH] hidapi: Unify imports in hidapi package (#2487) Remove all 'import xyz as _xyz' and favor import of module name to get more context in the code. Related #2273 --- lib/hidapi/__init__.py | 4 +-- lib/hidapi/hidapi.py | 5 +-- lib/hidapi/hidconsole.py | 26 +++++++-------- lib/hidapi/udev.py | 69 +++++++++++++++++++--------------------- 4 files changed, 50 insertions(+), 54 deletions(-) diff --git a/lib/hidapi/__init__.py b/lib/hidapi/__init__.py index f9aa38ef..e1add9e2 100644 --- a/lib/hidapi/__init__.py +++ b/lib/hidapi/__init__.py @@ -15,9 +15,9 @@ ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. """Generic Human Interface Device API.""" -import platform as _platform +import platform -if _platform.system() in ("Darwin", "Windows"): +if platform.system() in ("Darwin", "Windows"): from hidapi.hidapi import close # noqa: F401 from hidapi.hidapi import enumerate # noqa: F401 from hidapi.hidapi import find_paired_node # noqa: F401 diff --git a/lib/hidapi/hidapi.py b/lib/hidapi/hidapi.py index 87721226..6e11617c 100644 --- a/lib/hidapi/hidapi.py +++ b/lib/hidapi/hidapi.py @@ -22,10 +22,11 @@ See https://github.com/libusb/hidapi for how to obtain binaries. Parts of this code are adapted from https://github.com/apmorton/pyhidapi which is MIT licensed. """ + import atexit import ctypes import logging -import platform as _platform +import platform from collections import namedtuple from threading import Thread @@ -172,7 +173,7 @@ atexit.register(_hidapi.hid_exit) # Solaar opens the same device more than once which will fail unless we # allow non-exclusive opening. On windows opening with shared access is # the default, for macOS we need to set it explicitly. -if _platform.system() == "Darwin": +if platform.system() == "Darwin": _hidapi.hid_darwin_set_open_exclusive.argtypes = [ctypes.c_int] _hidapi.hid_darwin_set_open_exclusive.restype = None _hidapi.hid_darwin_set_open_exclusive(0) diff --git a/lib/hidapi/hidconsole.py b/lib/hidapi/hidconsole.py index c706e5af..36e89bf1 100644 --- a/lib/hidapi/hidconsole.py +++ b/lib/hidapi/hidconsole.py @@ -23,11 +23,11 @@ import time from binascii import hexlify from binascii import unhexlify -from select import select as _select +from select import select from threading import Lock from threading import Thread -import hidapi as _hid +import hidapi interactive = os.isatty(0) prompt = "?? Input: " if interactive else "" @@ -86,7 +86,7 @@ def _error(text, scroll=False): def _continuous_read(handle, timeout=2000): while True: try: - reply = _hid.read(handle, 128, timeout) + reply = hidapi.read(handle, 128, timeout) except OSError as e: _error("Read failed, aborting: " + str(e), True) break @@ -109,7 +109,7 @@ def _validate_input(line, hidpp=False): if data[:1] not in b"\x10\x11": _error("Invalid HID++ request: first byte must be 0x10 or 0x11") return None - if data[1:2] not in b"\xFF\x00\x01\x02\x03\x04\x05\x06\x07": + if data[1:2] not in b"\xff\x00\x01\x02\x03\x04\x05\x06\x07": _error("Invalid HID++ request: second byte must be 0xFF or one of 0x00..0x07") return None if data[:1] == b"\x10": @@ -135,7 +135,7 @@ def _open(args): device = args.device if args.hidpp and not device: - for d in _hid.enumerate(matchfn): + for d in hidapi.enumerate(matchfn): if d.driver == "logitech-djreceiver": device = d.path break @@ -145,19 +145,19 @@ def _open(args): sys.exit("!! Device path required.") print(".. Opening device", device) - handle = _hid.open_path(device) + handle = hidapi.open_path(device) if not handle: sys.exit(f"!! Failed to open {device}, aborting.") print( ".. Opened handle %r, vendor %r product %r serial %r." - % (handle, _hid.get_manufacturer(handle), _hid.get_product(handle), _hid.get_serial(handle)) + % (handle, hidapi.get_manufacturer(handle), hidapi.get_product(handle), hidapi.get_serial(handle)) ) if args.hidpp: - if _hid.get_manufacturer(handle) is not None and _hid.get_manufacturer(handle) != b"Logitech": + if hidapi.get_manufacturer(handle) is not None and hidapi.get_manufacturer(handle) != b"Logitech": sys.exit("!! Only Logitech devices support the HID++ protocol.") print(".. HID++ validation enabled.") else: - if _hid.get_manufacturer(handle) == b"Logitech" and b"Receiver" in _hid.get_product(handle): + if hidapi.get_manufacturer(handle) == b"Logitech" and b"Receiver" in hidapi.get_product(handle): args.hidpp = True print(".. Logitech receiver detected, HID++ validation enabled.") @@ -218,11 +218,11 @@ def main(): continue _print("<<", data) - _hid.write(handle, data) + hidapi.write(handle, data) # wait for some kind of reply if args.hidpp and not interactive: - rlist, wlist, xlist = _select([handle], [], [], 1) - if data[1:2] == b"\xFF": + rlist, wlist, xlist = select([handle], [], [], 1) + if data[1:2] == b"\xff": # the receiver will reply very fast, in a few milliseconds time.sleep(0.010) else: @@ -236,7 +236,7 @@ def main(): finally: print(f".. Closing handle {handle!r}") - _hid.close(handle) + hidapi.close(handle) if interactive: readline.write_history_file(args.history) diff --git a/lib/hidapi/udev.py b/lib/hidapi/udev.py index 198b61c9..ed1bfe5d 100644 --- a/lib/hidapi/udev.py +++ b/lib/hidapi/udev.py @@ -22,25 +22,20 @@ The docstrings are mostly copied from the hidapi API header, with changes where necessary. """ -import errno as _errno +import errno import logging -import os as _os -import warnings as _warnings +import os +import warnings # the tuple object we'll expose when enumerating devices from collections import namedtuple -from select import select as _select +from select import select from time import sleep -from time import time as _timestamp +from time import time import gi - -from pyudev import Context as _Context -from pyudev import Device as _Device -from pyudev import DeviceNotFoundError -from pyudev import Devices as _Devices -from pyudev import Monitor as _Monitor +import pyudev gi.require_version("Gdk", "3.0") from gi.repository import GLib # NOQA: E402 @@ -111,14 +106,14 @@ def _match(action, device, filterfn): return # these are devices connected through a receiver so don't pick them up here try: # if report descriptor does not indicate HID++ capabilities then this device is not of interest to Solaar - from hid_parser import ReportDescriptor as _ReportDescriptor + from hid_parser import ReportDescriptor hidpp_short = hidpp_long = False devfile = "/sys" + hid_device.properties.get("DEVPATH") + "/report_descriptor" with fileopen(devfile, "rb") as fd: - with _warnings.catch_warnings(): - _warnings.simplefilter("ignore") - rd = _ReportDescriptor(fd.read()) + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + rd = ReportDescriptor(fd.read()) hidpp_short = 0x10 in rd.input_report_ids and 6 * 8 == int(rd.get_input_report_size(0x10)) # and _Usage(0xFF00, 0x0001) in rd.get_input_items(0x10)[0].usages # be more permissive hidpp_long = 0x11 in rd.input_report_ids and 19 * 8 == int(rd.get_input_report_size(0x11)) @@ -207,29 +202,29 @@ def _match(action, device, filterfn): def find_paired_node(receiver_path, index, timeout): """Find the node of a device paired with a receiver""" - context = _Context() - receiver_phys = _Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS") + context = pyudev.Context() + receiver_phys = pyudev.Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS") if not receiver_phys: return None phys = f"{receiver_phys}:{index}" # noqa: E231 - timeout += _timestamp() - delta = _timestamp() + timeout += time() + delta = time() while delta < timeout: for dev in context.list_devices(subsystem="hidraw"): dev_phys = dev.find_parent("hid").get("HID_PHYS") if dev_phys and dev_phys == phys: return dev.device_node - delta = _timestamp() + delta = time() return None def find_paired_node_wpid(receiver_path, index): """Find the node of a device paired with a receiver, get wpid from udev""" - context = _Context() - receiver_phys = _Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS") + context = pyudev.Context() + receiver_phys = pyudev.Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS") if not receiver_phys: return None @@ -248,7 +243,7 @@ def find_paired_node_wpid(receiver_path, index): def monitor_glib(callback, filterfn): - c = _Context() + c = pyudev.Context() # already existing devices # for device in c.list_devices(subsystem='hidraw'): @@ -259,7 +254,7 @@ def monitor_glib(callback, filterfn): # GLib.idle_add(callback, 'add', d_info) # break - m = _Monitor.from_netlink(c) + m = pyudev.Monitor.from_netlink(c) m.filter_by(subsystem="hidraw") def _process_udev_event(monitor, condition, cb, filterfn): @@ -306,7 +301,7 @@ def enumerate(filterfn): if logger.isEnabledFor(logging.DEBUG): logger.debug("Starting dbus enumeration") - for dev in _Context().list_devices(subsystem="hidraw"): + for dev in pyudev.Context().list_devices(subsystem="hidraw"): dev_info = _match("add", dev, filterfn) if dev_info: yield dev_info @@ -343,10 +338,10 @@ def open_path(device_path): while retrycount < 3: retrycount += 1 try: - return _os.open(device_path, _os.O_RDWR | _os.O_SYNC) + return os.open(device_path, os.O_RDWR | os.O_SYNC) except OSError as e: logger.info("OPEN PATH FAILED %s ERROR %s %s", device_path, e.errno, e) - if e.errno == _errno.EACCES: + if e.errno == errno.EACCES: sleep(0.1) else: raise @@ -358,7 +353,7 @@ def close(device_handle): :param device_handle: a device handle returned by open() or open_path(). """ assert device_handle - _os.close(device_handle) + os.close(device_handle) def write(device_handle, data): @@ -390,14 +385,14 @@ def write(device_handle, data): while retrycount < 3: try: retrycount += 1 - bytes_written = _os.write(device_handle, data) + bytes_written = os.write(device_handle, data) except OSError as e: - if e.errno == _errno.EPIPE: + if e.errno == errno.EPIPE: sleep(0.1) else: break if bytes_written != len(data): - raise OSError(_errno.EIO, f"written {int(bytes_written)} bytes out of expected {len(data)}") + raise OSError(errno.EIO, f"written {int(bytes_written)} bytes out of expected {len(data)}") def read(device_handle, bytes_count, timeout_ms=-1): @@ -418,15 +413,15 @@ def read(device_handle, bytes_count, timeout_ms=-1): """ assert device_handle timeout = None if timeout_ms < 0 else timeout_ms / 1000.0 - rlist, wlist, xlist = _select([device_handle], [], [device_handle], timeout) + rlist, wlist, xlist = select([device_handle], [], [device_handle], timeout) if xlist: assert xlist == [device_handle] - raise OSError(_errno.EIO, f"exception on file descriptor {int(device_handle)}") + raise OSError(errno.EIO, f"exception on file descriptor {int(device_handle)}") if rlist: assert rlist == [device_handle] - data = _os.read(device_handle, bytes_count) + data = os.read(device_handle, bytes_count) assert data is not None assert isinstance(data, bytes), (repr(data), type(data)) return data @@ -482,10 +477,10 @@ def get_indexed_string(device_handle, index): return None assert device_handle - stat = _os.fstat(device_handle) + stat = os.fstat(device_handle) try: - dev = _Device.from_device_number(_Context(), "char", stat.st_rdev) - except (DeviceNotFoundError, ValueError): + dev = pyudev.Device.from_device_number(pyudev.Context(), "char", stat.st_rdev) + except (pyudev.DeviceNotFoundError, ValueError): return None hid_dev = dev.find_parent("hid")