diff --git a/lib/logitech_receiver/base.py b/lib/logitech_receiver/base.py index 05903bb5..24234e7d 100644 --- a/lib/logitech_receiver/base.py +++ b/lib/logitech_receiver/base.py @@ -30,10 +30,11 @@ from time import time as _timestamp import hidapi as _hid -from . import hidpp10 as _hidpp10 +from . import exceptions +from . import hidpp10_constants as _hidpp10_constants from . import hidpp20 as _hidpp20 +from . import hidpp20_constants as _hidpp20_constants from .base_usb import ALL as _RECEIVER_USB_IDS -from .common import KwException as _KwException from .common import strhex as _strhex from .descriptors import DEVICES as _DEVICES @@ -113,29 +114,6 @@ _DEVICE_REQUEST_TIMEOUT = DEFAULT_TIMEOUT # when pinging, be extra patient (no longer) _PING_TIMEOUT = DEFAULT_TIMEOUT -# -# Exceptions that may be raised by this API. -# - - -class NoReceiver(_KwException): - """Raised when trying to talk through a previously open handle, when the - receiver is no longer available. Should only happen if the receiver is - physically disconnected from the machine, or its kernel driver module is - unloaded.""" - pass - - -class NoSuchDevice(_KwException): - """Raised when trying to reach a device number not paired to the receiver.""" - pass - - -class DeviceUnreachable(_KwException): - """Raised when a request is made to an unreachable (turned off) device.""" - pass - - # # # @@ -263,7 +241,7 @@ def write(handle, devnumber, data, long_message=False): except Exception as reason: logger.error('write failed, assuming handle %r no longer available', handle) close(handle) - raise NoReceiver(reason=reason) + raise exceptions.NoReceiver(reason=reason) def read(handle, timeout=DEFAULT_TIMEOUT): @@ -312,7 +290,7 @@ def _read(handle, timeout): except Exception as reason: logger.warning('read failed, assuming handle %r no longer available', handle) close(handle) - raise NoReceiver(reason=reason) + raise exceptions.NoReceiver(reason=reason) if data and check_message(data): # ignore messages that fail check report_id = ord(data[:1]) @@ -343,7 +321,7 @@ def _skip_incoming(handle, ihandle, notifications_hook): except Exception as reason: logger.error('read failed, assuming receiver %s no longer available', handle) close(handle) - raise NoReceiver(reason=reason) + raise exceptions.NoReceiver(reason=reason) if data: if check_message(data): # only process messages that pass check @@ -464,7 +442,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error notifications_hook = getattr(handle, 'notifications_hook', None) try: _skip_incoming(handle, ihandle, notifications_hook) - except NoReceiver: + except exceptions.NoReceiver: logger.warning('device or receiver disconnected') return None write(ihandle, devnumber, request_data, long_message) @@ -489,15 +467,15 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error if logger.isEnabledFor(logging.DEBUG): logger.debug( '(%s) device 0x%02X error on request {%04X}: %d = %s', handle, devnumber, request_id, error, - _hidpp10.ERROR[error] + _hidpp10_constants.ERROR[error] ) - return _hidpp10.ERROR[error] if return_error else None + return _hidpp10_constants.ERROR[error] if return_error else None if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_data[:2]: # a HID++ 2.0 feature call returned with an error error = ord(reply_data[3:4]) logger.error( '(%s) device %d error on feature request {%04X}: %d = %s', handle, devnumber, request_id, error, - _hidpp20.ERROR[error] + _hidpp20_constants.ERROR[error] ) raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params) @@ -549,7 +527,7 @@ def ping(handle, devnumber, long_message=False): notifications_hook = getattr(handle, 'notifications_hook', None) try: _skip_incoming(handle, int(handle), notifications_hook) - except NoReceiver: + except exceptions.NoReceiver: logger.warning('device or receiver disconnected') return @@ -574,13 +552,13 @@ def ping(handle, devnumber, long_message=False): if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and \ reply_data[1:3] == request_data[:2]: # error response error = ord(reply_data[3:4]) - if error == _hidpp10.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device + if error == _hidpp10_constants.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device return 1.0 - if error == _hidpp10.ERROR.resource_error or error == _hidpp10.ERROR.connection_request_failed: + if error == _hidpp10_constants.ERROR.resource_error or error == _hidpp10_constants.ERROR.connection_request_failed: return # device unreachable - if error == _hidpp10.ERROR.unknown_device: # no paired device with that number + if error == _hidpp10_constants.ERROR.unknown_device: # no paired device with that number logger.error('(%s) device %d error on ping request: unknown device', handle, devnumber) - raise NoSuchDevice(number=devnumber, request=request_id) + raise exceptions.NoSuchDevice(number=devnumber, request=request_id) if notifications_hook: n = make_notification(report_id, reply_devnumber, reply_data) diff --git a/lib/logitech_receiver/device.py b/lib/logitech_receiver/device.py index c8a6dc1b..adc78860 100644 --- a/lib/logitech_receiver/device.py +++ b/lib/logitech_receiver/device.py @@ -28,6 +28,7 @@ import solaar.configuration as _configuration from . import base as _base from . import descriptors as _descriptors +from . import exceptions from . import hidpp10 as _hidpp10 from . import hidpp20 as _hidpp20 from .common import strhex as _strhex @@ -128,7 +129,7 @@ class Device: self.wpid = _hid.find_paired_node_wpid(receiver.path, number) if not self.wpid: logger.error('Unable to get wpid from udev for device %d of %s', number, receiver) - raise _base.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device') + raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device') kind = receiver.get_kind_from_index(number) self._kind = _hidpp10.DEVICE_KIND[kind] else: # get information from pairing registers @@ -136,7 +137,7 @@ class Device: self.update_pairing_information() self.update_extended_pairing_information() if not self.wpid and not self._serial: # if neither then the device almost certainly wasn't found - raise _base.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial') + raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial') # the wpid is set to None on this object when the device is unpaired assert self.wpid is not None, 'failed to read wpid: device %d of %s' % (number, receiver) @@ -207,7 +208,7 @@ class Device: if not self.online: # be very defensive try: self.ping() - except _base.NoSuchDevice: + except exceptions.NoSuchDevice: pass if self.online and self.protocol >= 2.0: self._name = _hidpp20.get_name(self) @@ -513,7 +514,7 @@ class Device: def __str__(self): try: name = self.name or self.codename or '?' - except _base.NoSuchDevice: + except exceptions.NoSuchDevice: name = 'name not available' return '' % (self.number, self.wpid or self.product_id, name, self.serial) diff --git a/lib/logitech_receiver/exceptions.py b/lib/logitech_receiver/exceptions.py new file mode 100644 index 00000000..5ae67c59 --- /dev/null +++ b/lib/logitech_receiver/exceptions.py @@ -0,0 +1,33 @@ +from .common import KwException as _KwException + +# +# Exceptions that may be raised by this API. +# + + +class NoReceiver(_KwException): + """Raised when trying to talk through a previously open handle, when the + receiver is no longer available. Should only happen if the receiver is + physically disconnected from the machine, or its kernel driver module is + unloaded.""" + pass + + +class NoSuchDevice(_KwException): + """Raised when trying to reach a device number not paired to the receiver.""" + pass + + +class DeviceUnreachable(_KwException): + """Raised when a request is made to an unreachable (turned off) device.""" + pass + + +class FeatureNotSupported(_KwException): + """Raised when trying to request a feature not supported by the device.""" + pass + + +class FeatureCallError(_KwException): + """Raised if the device replied to a feature call with an error.""" + pass diff --git a/lib/logitech_receiver/hidpp20.py b/lib/logitech_receiver/hidpp20.py index 7fc2f0ae..94ca822b 100644 --- a/lib/logitech_receiver/hidpp20.py +++ b/lib/logitech_receiver/hidpp20.py @@ -28,10 +28,9 @@ from typing import List, Optional import yaml as _yaml -from . import special_keys +from . import exceptions, special_keys from .common import BATTERY_APPROX as _BATTERY_APPROX from .common import FirmwareInfo as _FirmwareInfo -from .common import KwException as _KwException from .common import NamedInt as _NamedInt from .common import NamedInts as _NamedInts from .common import UnsortedNamedInts as _UnsortedNamedInts @@ -228,21 +227,6 @@ ERROR = _NamedInts( # -class FeatureNotSupported(_KwException): - """Raised when trying to request a feature not supported by the device.""" - pass - - -class FeatureCallError(_KwException): - """Raised if the device replied to a feature call with an error.""" - pass - - -# -# -# - - class FeaturesArray(dict): def __init__(self, device): @@ -463,8 +447,8 @@ class ReprogrammableKeyV4(ReprogrammableKey): mapping_flags_2 = 0 self._mapping_flags = mapping_flags_1 | (mapping_flags_2 << 8) else: - raise FeatureCallError(msg='No reply from device.') - except FeatureCallError: # if the key hasn't ever been configured then the read may fail so only produce a warning + raise exceptions.FeatureCallError(msg='No reply from device.') + except exceptions.FeatureCallError: # if the key hasn't ever been configured then the read may fail so only produce a warning if logger.isEnabledFor(logging.WARNING): logger.warn( f'Feature Call Error in _getCidReporting on device {self._device} for cid {self._cid} - use defaults' @@ -499,7 +483,7 @@ class ReprogrammableKeyV4(ReprogrammableKey): bfield = 0 for f, v in flags.items(): if v and FLAG_TO_CAPABILITY[f] not in self.flags: - raise FeatureNotSupported( + raise exceptions.FeatureNotSupported( msg=f'Tried to set mapping flag "{f}" on control "{self.key}" ' + f'which does not support "{FLAG_TO_CAPABILITY[f]}" on device {self._device}.' ) @@ -512,7 +496,7 @@ class ReprogrammableKeyV4(ReprogrammableKey): self._mapping_flags &= ~int(f) if remap != 0 and remap not in self.remappable_to: - raise FeatureNotSupported( + raise exceptions.FeatureNotSupported( msg=f'Tried to remap control "{self.key}" to a control ID {remap} which it is not remappable to ' + f'on device {self._device}.' ) @@ -1045,7 +1029,7 @@ class Spec: def read(self): try: value = feature_request(self._device, FEATURE.GESTURE_2, 0x50, self.id, 0xFF) - except FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus) + except exceptions.FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus) if logger.isEnabledFor(logging.WARNING): logger.warn(f'Feature Call Error reading Gesture Spec on device {self._device} for spec {self.id} - use None') return None @@ -1130,7 +1114,7 @@ class Backlight: def __init__(self, device): response = device.feature_request(FEATURE.BACKLIGHT2, 0x00) if not response: - raise FeatureCallError(msg='No reply from device.') + raise exceptions.FeatureCallError(msg='No reply from device.') self.device = device self.enabled, self.options, supported, effects, self.level, self.dho, self.dhi, self.dpow = _unpack( ' %s', self, 'enabled' if enable else 'disabled', flag_names) return flag_bits @@ -154,33 +156,33 @@ class Receiver: pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) if pair_info: wpid = _strhex(pair_info[3:4]) + _strhex(pair_info[2:3]) - kind = _hidpp10.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F] + kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F] return wpid, kind, 0 else: - raise _base.NoSuchDevice(number=n, receiver=self, error='read Bolt wpid') + raise exceptions.NoSuchDevice(number=n, receiver=self, error='read Bolt wpid') wpid = 0 kind = None polling_rate = None pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) if pair_info: # may be either a Unifying receiver, or an Unifying-ready receiver wpid = _strhex(pair_info[3:5]) - kind = _hidpp10.DEVICE_KIND[ord(pair_info[7:8]) & 0x0F] + kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[7:8]) & 0x0F] polling_rate = str(ord(pair_info[2:3])) + 'ms' elif self.receiver_kind == '27Mz': # 27Mhz receiver, fill extracting WPID from udev path wpid = _hid.find_paired_node_wpid(self.path, n) if not wpid: logger.error('Unable to get wpid from udev for device %d of %s', n, self) - raise _base.NoSuchDevice(number=n, receiver=self, error='Not present 27Mhz device') + raise exceptions.NoSuchDevice(number=n, receiver=self, error='Not present 27Mhz device') kind = _hidpp10.DEVICE_KIND[self.get_kind_from_index(n)] elif not self.receiver_kind == 'unifying': # unifying protocol not supported, may be an old Nano receiver device_info = self.read_register(_R.receiver_info, 0x04) if device_info: wpid = _strhex(device_info[3:5]) - kind = _hidpp10.DEVICE_KIND[0x00] # unknown kind + kind = _hidpp10_constants.DEVICE_KIND[0x00] # unknown kind else: - raise _base.NoSuchDevice(number=n, receiver=self, error='read pairing information - non-unifying') + raise exceptions.NoSuchDevice(number=n, receiver=self, error='read pairing information - non-unifying') else: - raise _base.NoSuchDevice(number=n, receiver=self, error='read pairing information') + raise exceptions.NoSuchDevice(number=n, receiver=self, error='read pairing information') return wpid, kind, polling_rate def device_extended_pairing_information(self, n): @@ -217,7 +219,7 @@ class Receiver: kind = 3 else: # unknown device number on 27Mhz receiver logger.error('failed to calculate device kind for device %d of %s', index, self) - raise _base.NoSuchDevice(number=index, receiver=self, error='Unknown 27Mhz device number') + raise exceptions.NoSuchDevice(number=index, receiver=self, error='Unknown 27Mhz device number') return kind def notify_devices(self): @@ -239,7 +241,7 @@ class Receiver: logger.info('%s: found new device %d (%s)', self, number, dev.wpid) self._devices[number] = dev return dev - except _base.NoSuchDevice as e: + except exceptions.NoSuchDevice as e: logger.warning('register new device failed for %s device %d error %s', e.receiver, e.number, e.error) logger.warning('%s: looked for device %d, not found', self, number) diff --git a/lib/solaar/cli/show.py b/lib/solaar/cli/show.py index 9b204d0a..307366ff 100644 --- a/lib/solaar/cli/show.py +++ b/lib/solaar/cli/show.py @@ -16,7 +16,7 @@ ## with this program; if not, write to the Free Software Foundation, Inc., ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. -from logitech_receiver import base as _base +from logitech_receiver import exceptions from logitech_receiver import hidpp10 as _hidpp10 from logitech_receiver import hidpp20 as _hidpp20 from logitech_receiver import receiver as _receiver @@ -85,7 +85,7 @@ def _print_device(dev, num=None): # try to ping the device to see if it actually exists and to wake it up try: dev.ping() - except _base.NoSuchDevice: + except exceptions.NoSuchDevice: print(' %s: Device not found' % num or dev.number) return diff --git a/lib/solaar/listener.py b/lib/solaar/listener.py index 0e4c027c..717dc385 100644 --- a/lib/solaar/listener.py +++ b/lib/solaar/listener.py @@ -28,7 +28,8 @@ import logitech_receiver.device as _device import logitech_receiver.receiver as _receiver from logitech_receiver import base as _base -from logitech_receiver import hidpp10 as _hidpp10 +from logitech_receiver import exceptions +from logitech_receiver import hidpp10_constants as _hidpp10_constants from logitech_receiver import listener as _listener from logitech_receiver import notifications as _notifications from logitech_receiver import status as _status @@ -42,8 +43,8 @@ from gi.repository import GLib # NOQA: E402 # isort:skip logger = logging.getLogger(__name__) -_R = _hidpp10.REGISTERS -_IR = _hidpp10.INFO_SUBREGISTERS +_R = _hidpp10_constants.REGISTERS +_IR = _hidpp10_constants.INFO_SUBREGISTERS # # @@ -89,7 +90,7 @@ class ReceiverListener(_listener.EventsListener): logger.info('%s: notifications listener has started (%s)', self.receiver, self.receiver.handle) nfs = self.receiver.enable_connection_notifications() if logger.isEnabledFor(logging.WARNING): - if not self.receiver.isDevice and not ((nfs if nfs else 0) & _hidpp10.NOTIFICATION_FLAG.wireless): + if not self.receiver.isDevice and not ((nfs if nfs else 0) & _hidpp10_constants.NOTIFICATION_FLAG.wireless): logger.warning( 'Receiver on %s might not support connection notifications, GUI might not show its devices', self.receiver.path @@ -403,7 +404,7 @@ def _process_add(device_info, retry): _error_callback('permissions', device_info.path) else: _error_callback('nodevice', device_info.path) - except _base.NoReceiver: + except exceptions.NoReceiver: _error_callback('nodevice', device_info.path)