hidapi: Unify imports in hidapi package (#2487)

Remove all 'import xyz as _xyz' and favor import of module name to
get more context in the code.

Related #2273
This commit is contained in:
MattHag 2024-05-16 21:58:22 +02:00 committed by GitHub
parent f15a50b4b2
commit d0a3e474c7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 50 additions and 54 deletions

View File

@ -15,9 +15,9 @@
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
"""Generic Human Interface Device API.""" """Generic Human Interface Device API."""
import platform as _platform import platform
if _platform.system() in ("Darwin", "Windows"): if platform.system() in ("Darwin", "Windows"):
from hidapi.hidapi import close # noqa: F401 from hidapi.hidapi import close # noqa: F401
from hidapi.hidapi import enumerate # noqa: F401 from hidapi.hidapi import enumerate # noqa: F401
from hidapi.hidapi import find_paired_node # noqa: F401 from hidapi.hidapi import find_paired_node # noqa: F401

View File

@ -22,10 +22,11 @@ See https://github.com/libusb/hidapi for how to obtain binaries.
Parts of this code are adapted from https://github.com/apmorton/pyhidapi Parts of this code are adapted from https://github.com/apmorton/pyhidapi
which is MIT licensed. which is MIT licensed.
""" """
import atexit import atexit
import ctypes import ctypes
import logging import logging
import platform as _platform import platform
from collections import namedtuple from collections import namedtuple
from threading import Thread from threading import Thread
@ -172,7 +173,7 @@ atexit.register(_hidapi.hid_exit)
# Solaar opens the same device more than once which will fail unless we # Solaar opens the same device more than once which will fail unless we
# allow non-exclusive opening. On windows opening with shared access is # allow non-exclusive opening. On windows opening with shared access is
# the default, for macOS we need to set it explicitly. # the default, for macOS we need to set it explicitly.
if _platform.system() == "Darwin": if platform.system() == "Darwin":
_hidapi.hid_darwin_set_open_exclusive.argtypes = [ctypes.c_int] _hidapi.hid_darwin_set_open_exclusive.argtypes = [ctypes.c_int]
_hidapi.hid_darwin_set_open_exclusive.restype = None _hidapi.hid_darwin_set_open_exclusive.restype = None
_hidapi.hid_darwin_set_open_exclusive(0) _hidapi.hid_darwin_set_open_exclusive(0)

View File

@ -23,11 +23,11 @@ import time
from binascii import hexlify from binascii import hexlify
from binascii import unhexlify from binascii import unhexlify
from select import select as _select from select import select
from threading import Lock from threading import Lock
from threading import Thread from threading import Thread
import hidapi as _hid import hidapi
interactive = os.isatty(0) interactive = os.isatty(0)
prompt = "?? Input: " if interactive else "" prompt = "?? Input: " if interactive else ""
@ -86,7 +86,7 @@ def _error(text, scroll=False):
def _continuous_read(handle, timeout=2000): def _continuous_read(handle, timeout=2000):
while True: while True:
try: try:
reply = _hid.read(handle, 128, timeout) reply = hidapi.read(handle, 128, timeout)
except OSError as e: except OSError as e:
_error("Read failed, aborting: " + str(e), True) _error("Read failed, aborting: " + str(e), True)
break break
@ -109,7 +109,7 @@ def _validate_input(line, hidpp=False):
if data[:1] not in b"\x10\x11": if data[:1] not in b"\x10\x11":
_error("Invalid HID++ request: first byte must be 0x10 or 0x11") _error("Invalid HID++ request: first byte must be 0x10 or 0x11")
return None return None
if data[1:2] not in b"\xFF\x00\x01\x02\x03\x04\x05\x06\x07": if data[1:2] not in b"\xff\x00\x01\x02\x03\x04\x05\x06\x07":
_error("Invalid HID++ request: second byte must be 0xFF or one of 0x00..0x07") _error("Invalid HID++ request: second byte must be 0xFF or one of 0x00..0x07")
return None return None
if data[:1] == b"\x10": if data[:1] == b"\x10":
@ -135,7 +135,7 @@ def _open(args):
device = args.device device = args.device
if args.hidpp and not device: if args.hidpp and not device:
for d in _hid.enumerate(matchfn): for d in hidapi.enumerate(matchfn):
if d.driver == "logitech-djreceiver": if d.driver == "logitech-djreceiver":
device = d.path device = d.path
break break
@ -145,19 +145,19 @@ def _open(args):
sys.exit("!! Device path required.") sys.exit("!! Device path required.")
print(".. Opening device", device) print(".. Opening device", device)
handle = _hid.open_path(device) handle = hidapi.open_path(device)
if not handle: if not handle:
sys.exit(f"!! Failed to open {device}, aborting.") sys.exit(f"!! Failed to open {device}, aborting.")
print( print(
".. Opened handle %r, vendor %r product %r serial %r." ".. Opened handle %r, vendor %r product %r serial %r."
% (handle, _hid.get_manufacturer(handle), _hid.get_product(handle), _hid.get_serial(handle)) % (handle, hidapi.get_manufacturer(handle), hidapi.get_product(handle), hidapi.get_serial(handle))
) )
if args.hidpp: if args.hidpp:
if _hid.get_manufacturer(handle) is not None and _hid.get_manufacturer(handle) != b"Logitech": if hidapi.get_manufacturer(handle) is not None and hidapi.get_manufacturer(handle) != b"Logitech":
sys.exit("!! Only Logitech devices support the HID++ protocol.") sys.exit("!! Only Logitech devices support the HID++ protocol.")
print(".. HID++ validation enabled.") print(".. HID++ validation enabled.")
else: else:
if _hid.get_manufacturer(handle) == b"Logitech" and b"Receiver" in _hid.get_product(handle): if hidapi.get_manufacturer(handle) == b"Logitech" and b"Receiver" in hidapi.get_product(handle):
args.hidpp = True args.hidpp = True
print(".. Logitech receiver detected, HID++ validation enabled.") print(".. Logitech receiver detected, HID++ validation enabled.")
@ -218,11 +218,11 @@ def main():
continue continue
_print("<<", data) _print("<<", data)
_hid.write(handle, data) hidapi.write(handle, data)
# wait for some kind of reply # wait for some kind of reply
if args.hidpp and not interactive: if args.hidpp and not interactive:
rlist, wlist, xlist = _select([handle], [], [], 1) rlist, wlist, xlist = select([handle], [], [], 1)
if data[1:2] == b"\xFF": if data[1:2] == b"\xff":
# the receiver will reply very fast, in a few milliseconds # the receiver will reply very fast, in a few milliseconds
time.sleep(0.010) time.sleep(0.010)
else: else:
@ -236,7 +236,7 @@ def main():
finally: finally:
print(f".. Closing handle {handle!r}") print(f".. Closing handle {handle!r}")
_hid.close(handle) hidapi.close(handle)
if interactive: if interactive:
readline.write_history_file(args.history) readline.write_history_file(args.history)

View File

@ -22,25 +22,20 @@ The docstrings are mostly copied from the hidapi API header, with changes where
necessary. necessary.
""" """
import errno as _errno import errno
import logging import logging
import os as _os import os
import warnings as _warnings import warnings
# the tuple object we'll expose when enumerating devices # the tuple object we'll expose when enumerating devices
from collections import namedtuple from collections import namedtuple
from select import select as _select from select import select
from time import sleep from time import sleep
from time import time as _timestamp from time import time
import gi import gi
import pyudev
from pyudev import Context as _Context
from pyudev import Device as _Device
from pyudev import DeviceNotFoundError
from pyudev import Devices as _Devices
from pyudev import Monitor as _Monitor
gi.require_version("Gdk", "3.0") gi.require_version("Gdk", "3.0")
from gi.repository import GLib # NOQA: E402 from gi.repository import GLib # NOQA: E402
@ -111,14 +106,14 @@ def _match(action, device, filterfn):
return # these are devices connected through a receiver so don't pick them up here return # these are devices connected through a receiver so don't pick them up here
try: # if report descriptor does not indicate HID++ capabilities then this device is not of interest to Solaar try: # if report descriptor does not indicate HID++ capabilities then this device is not of interest to Solaar
from hid_parser import ReportDescriptor as _ReportDescriptor from hid_parser import ReportDescriptor
hidpp_short = hidpp_long = False hidpp_short = hidpp_long = False
devfile = "/sys" + hid_device.properties.get("DEVPATH") + "/report_descriptor" devfile = "/sys" + hid_device.properties.get("DEVPATH") + "/report_descriptor"
with fileopen(devfile, "rb") as fd: with fileopen(devfile, "rb") as fd:
with _warnings.catch_warnings(): with warnings.catch_warnings():
_warnings.simplefilter("ignore") warnings.simplefilter("ignore")
rd = _ReportDescriptor(fd.read()) rd = ReportDescriptor(fd.read())
hidpp_short = 0x10 in rd.input_report_ids and 6 * 8 == int(rd.get_input_report_size(0x10)) hidpp_short = 0x10 in rd.input_report_ids and 6 * 8 == int(rd.get_input_report_size(0x10))
# and _Usage(0xFF00, 0x0001) in rd.get_input_items(0x10)[0].usages # be more permissive # and _Usage(0xFF00, 0x0001) in rd.get_input_items(0x10)[0].usages # be more permissive
hidpp_long = 0x11 in rd.input_report_ids and 19 * 8 == int(rd.get_input_report_size(0x11)) hidpp_long = 0x11 in rd.input_report_ids and 19 * 8 == int(rd.get_input_report_size(0x11))
@ -207,29 +202,29 @@ def _match(action, device, filterfn):
def find_paired_node(receiver_path, index, timeout): def find_paired_node(receiver_path, index, timeout):
"""Find the node of a device paired with a receiver""" """Find the node of a device paired with a receiver"""
context = _Context() context = pyudev.Context()
receiver_phys = _Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS") receiver_phys = pyudev.Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS")
if not receiver_phys: if not receiver_phys:
return None return None
phys = f"{receiver_phys}:{index}" # noqa: E231 phys = f"{receiver_phys}:{index}" # noqa: E231
timeout += _timestamp() timeout += time()
delta = _timestamp() delta = time()
while delta < timeout: while delta < timeout:
for dev in context.list_devices(subsystem="hidraw"): for dev in context.list_devices(subsystem="hidraw"):
dev_phys = dev.find_parent("hid").get("HID_PHYS") dev_phys = dev.find_parent("hid").get("HID_PHYS")
if dev_phys and dev_phys == phys: if dev_phys and dev_phys == phys:
return dev.device_node return dev.device_node
delta = _timestamp() delta = time()
return None return None
def find_paired_node_wpid(receiver_path, index): def find_paired_node_wpid(receiver_path, index):
"""Find the node of a device paired with a receiver, get wpid from udev""" """Find the node of a device paired with a receiver, get wpid from udev"""
context = _Context() context = pyudev.Context()
receiver_phys = _Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS") receiver_phys = pyudev.Devices.from_device_file(context, receiver_path).find_parent("hid").get("HID_PHYS")
if not receiver_phys: if not receiver_phys:
return None return None
@ -248,7 +243,7 @@ def find_paired_node_wpid(receiver_path, index):
def monitor_glib(callback, filterfn): def monitor_glib(callback, filterfn):
c = _Context() c = pyudev.Context()
# already existing devices # already existing devices
# for device in c.list_devices(subsystem='hidraw'): # for device in c.list_devices(subsystem='hidraw'):
@ -259,7 +254,7 @@ def monitor_glib(callback, filterfn):
# GLib.idle_add(callback, 'add', d_info) # GLib.idle_add(callback, 'add', d_info)
# break # break
m = _Monitor.from_netlink(c) m = pyudev.Monitor.from_netlink(c)
m.filter_by(subsystem="hidraw") m.filter_by(subsystem="hidraw")
def _process_udev_event(monitor, condition, cb, filterfn): def _process_udev_event(monitor, condition, cb, filterfn):
@ -306,7 +301,7 @@ def enumerate(filterfn):
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug("Starting dbus enumeration") logger.debug("Starting dbus enumeration")
for dev in _Context().list_devices(subsystem="hidraw"): for dev in pyudev.Context().list_devices(subsystem="hidraw"):
dev_info = _match("add", dev, filterfn) dev_info = _match("add", dev, filterfn)
if dev_info: if dev_info:
yield dev_info yield dev_info
@ -343,10 +338,10 @@ def open_path(device_path):
while retrycount < 3: while retrycount < 3:
retrycount += 1 retrycount += 1
try: try:
return _os.open(device_path, _os.O_RDWR | _os.O_SYNC) return os.open(device_path, os.O_RDWR | os.O_SYNC)
except OSError as e: except OSError as e:
logger.info("OPEN PATH FAILED %s ERROR %s %s", device_path, e.errno, e) logger.info("OPEN PATH FAILED %s ERROR %s %s", device_path, e.errno, e)
if e.errno == _errno.EACCES: if e.errno == errno.EACCES:
sleep(0.1) sleep(0.1)
else: else:
raise raise
@ -358,7 +353,7 @@ def close(device_handle):
:param device_handle: a device handle returned by open() or open_path(). :param device_handle: a device handle returned by open() or open_path().
""" """
assert device_handle assert device_handle
_os.close(device_handle) os.close(device_handle)
def write(device_handle, data): def write(device_handle, data):
@ -390,14 +385,14 @@ def write(device_handle, data):
while retrycount < 3: while retrycount < 3:
try: try:
retrycount += 1 retrycount += 1
bytes_written = _os.write(device_handle, data) bytes_written = os.write(device_handle, data)
except OSError as e: except OSError as e:
if e.errno == _errno.EPIPE: if e.errno == errno.EPIPE:
sleep(0.1) sleep(0.1)
else: else:
break break
if bytes_written != len(data): if bytes_written != len(data):
raise OSError(_errno.EIO, f"written {int(bytes_written)} bytes out of expected {len(data)}") raise OSError(errno.EIO, f"written {int(bytes_written)} bytes out of expected {len(data)}")
def read(device_handle, bytes_count, timeout_ms=-1): def read(device_handle, bytes_count, timeout_ms=-1):
@ -418,15 +413,15 @@ def read(device_handle, bytes_count, timeout_ms=-1):
""" """
assert device_handle assert device_handle
timeout = None if timeout_ms < 0 else timeout_ms / 1000.0 timeout = None if timeout_ms < 0 else timeout_ms / 1000.0
rlist, wlist, xlist = _select([device_handle], [], [device_handle], timeout) rlist, wlist, xlist = select([device_handle], [], [device_handle], timeout)
if xlist: if xlist:
assert xlist == [device_handle] assert xlist == [device_handle]
raise OSError(_errno.EIO, f"exception on file descriptor {int(device_handle)}") raise OSError(errno.EIO, f"exception on file descriptor {int(device_handle)}")
if rlist: if rlist:
assert rlist == [device_handle] assert rlist == [device_handle]
data = _os.read(device_handle, bytes_count) data = os.read(device_handle, bytes_count)
assert data is not None assert data is not None
assert isinstance(data, bytes), (repr(data), type(data)) assert isinstance(data, bytes), (repr(data), type(data))
return data return data
@ -482,10 +477,10 @@ def get_indexed_string(device_handle, index):
return None return None
assert device_handle assert device_handle
stat = _os.fstat(device_handle) stat = os.fstat(device_handle)
try: try:
dev = _Device.from_device_number(_Context(), "char", stat.st_rdev) dev = pyudev.Device.from_device_number(pyudev.Context(), "char", stat.st_rdev)
except (DeviceNotFoundError, ValueError): except (pyudev.DeviceNotFoundError, ValueError):
return None return None
hid_dev = dev.find_parent("hid") hid_dev = dev.find_parent("hid")