logitech_receiver: Move exceptions into own module

Related #1097
This commit is contained in:
Matthias Hagmann 2024-02-14 20:37:41 +01:00 committed by Peter F. Patel-Schneider
parent fa9494435b
commit e8fdbeee8e
8 changed files with 88 additions and 88 deletions

View File

@ -30,10 +30,11 @@ from time import time as _timestamp
import hidapi as _hid import hidapi as _hid
from . import hidpp10 as _hidpp10 from . import exceptions
from . import hidpp10_constants as _hidpp10_constants
from . import hidpp20 as _hidpp20 from . import hidpp20 as _hidpp20
from . import hidpp20_constants as _hidpp20_constants
from .base_usb import ALL as _RECEIVER_USB_IDS from .base_usb import ALL as _RECEIVER_USB_IDS
from .common import KwException as _KwException
from .common import strhex as _strhex from .common import strhex as _strhex
from .descriptors import DEVICES as _DEVICES from .descriptors import DEVICES as _DEVICES
@ -113,29 +114,6 @@ _DEVICE_REQUEST_TIMEOUT = DEFAULT_TIMEOUT
# when pinging, be extra patient (no longer) # when pinging, be extra patient (no longer)
_PING_TIMEOUT = DEFAULT_TIMEOUT _PING_TIMEOUT = DEFAULT_TIMEOUT
#
# Exceptions that may be raised by this API.
#
class NoReceiver(_KwException):
"""Raised when trying to talk through a previously open handle, when the
receiver is no longer available. Should only happen if the receiver is
physically disconnected from the machine, or its kernel driver module is
unloaded."""
pass
class NoSuchDevice(_KwException):
"""Raised when trying to reach a device number not paired to the receiver."""
pass
class DeviceUnreachable(_KwException):
"""Raised when a request is made to an unreachable (turned off) device."""
pass
# #
# #
# #
@ -263,7 +241,7 @@ def write(handle, devnumber, data, long_message=False):
except Exception as reason: except Exception as reason:
logger.error('write failed, assuming handle %r no longer available', handle) logger.error('write failed, assuming handle %r no longer available', handle)
close(handle) close(handle)
raise NoReceiver(reason=reason) raise exceptions.NoReceiver(reason=reason)
def read(handle, timeout=DEFAULT_TIMEOUT): def read(handle, timeout=DEFAULT_TIMEOUT):
@ -312,7 +290,7 @@ def _read(handle, timeout):
except Exception as reason: except Exception as reason:
logger.warning('read failed, assuming handle %r no longer available', handle) logger.warning('read failed, assuming handle %r no longer available', handle)
close(handle) close(handle)
raise NoReceiver(reason=reason) raise exceptions.NoReceiver(reason=reason)
if data and check_message(data): # ignore messages that fail check if data and check_message(data): # ignore messages that fail check
report_id = ord(data[:1]) report_id = ord(data[:1])
@ -343,7 +321,7 @@ def _skip_incoming(handle, ihandle, notifications_hook):
except Exception as reason: except Exception as reason:
logger.error('read failed, assuming receiver %s no longer available', handle) logger.error('read failed, assuming receiver %s no longer available', handle)
close(handle) close(handle)
raise NoReceiver(reason=reason) raise exceptions.NoReceiver(reason=reason)
if data: if data:
if check_message(data): # only process messages that pass check if check_message(data): # only process messages that pass check
@ -464,7 +442,7 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
notifications_hook = getattr(handle, 'notifications_hook', None) notifications_hook = getattr(handle, 'notifications_hook', None)
try: try:
_skip_incoming(handle, ihandle, notifications_hook) _skip_incoming(handle, ihandle, notifications_hook)
except NoReceiver: except exceptions.NoReceiver:
logger.warning('device or receiver disconnected') logger.warning('device or receiver disconnected')
return None return None
write(ihandle, devnumber, request_data, long_message) write(ihandle, devnumber, request_data, long_message)
@ -489,15 +467,15 @@ def request(handle, devnumber, request_id, *params, no_reply=False, return_error
if logger.isEnabledFor(logging.DEBUG): if logger.isEnabledFor(logging.DEBUG):
logger.debug( logger.debug(
'(%s) device 0x%02X error on request {%04X}: %d = %s', handle, devnumber, request_id, error, '(%s) device 0x%02X error on request {%04X}: %d = %s', handle, devnumber, request_id, error,
_hidpp10.ERROR[error] _hidpp10_constants.ERROR[error]
) )
return _hidpp10.ERROR[error] if return_error else None return _hidpp10_constants.ERROR[error] if return_error else None
if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_data[:2]: if reply_data[:1] == b'\xFF' and reply_data[1:3] == request_data[:2]:
# a HID++ 2.0 feature call returned with an error # a HID++ 2.0 feature call returned with an error
error = ord(reply_data[3:4]) error = ord(reply_data[3:4])
logger.error( logger.error(
'(%s) device %d error on feature request {%04X}: %d = %s', handle, devnumber, request_id, error, '(%s) device %d error on feature request {%04X}: %d = %s', handle, devnumber, request_id, error,
_hidpp20.ERROR[error] _hidpp20_constants.ERROR[error]
) )
raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params) raise _hidpp20.FeatureCallError(number=devnumber, request=request_id, error=error, params=params)
@ -549,7 +527,7 @@ def ping(handle, devnumber, long_message=False):
notifications_hook = getattr(handle, 'notifications_hook', None) notifications_hook = getattr(handle, 'notifications_hook', None)
try: try:
_skip_incoming(handle, int(handle), notifications_hook) _skip_incoming(handle, int(handle), notifications_hook)
except NoReceiver: except exceptions.NoReceiver:
logger.warning('device or receiver disconnected') logger.warning('device or receiver disconnected')
return return
@ -574,13 +552,13 @@ def ping(handle, devnumber, long_message=False):
if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and \ if report_id == HIDPP_SHORT_MESSAGE_ID and reply_data[:1] == b'\x8F' and \
reply_data[1:3] == request_data[:2]: # error response reply_data[1:3] == request_data[:2]: # error response
error = ord(reply_data[3:4]) error = ord(reply_data[3:4])
if error == _hidpp10.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device if error == _hidpp10_constants.ERROR.invalid_SubID__command: # a valid reply from a HID++ 1.0 device
return 1.0 return 1.0
if error == _hidpp10.ERROR.resource_error or error == _hidpp10.ERROR.connection_request_failed: if error == _hidpp10_constants.ERROR.resource_error or error == _hidpp10_constants.ERROR.connection_request_failed:
return # device unreachable return # device unreachable
if error == _hidpp10.ERROR.unknown_device: # no paired device with that number if error == _hidpp10_constants.ERROR.unknown_device: # no paired device with that number
logger.error('(%s) device %d error on ping request: unknown device', handle, devnumber) logger.error('(%s) device %d error on ping request: unknown device', handle, devnumber)
raise NoSuchDevice(number=devnumber, request=request_id) raise exceptions.NoSuchDevice(number=devnumber, request=request_id)
if notifications_hook: if notifications_hook:
n = make_notification(report_id, reply_devnumber, reply_data) n = make_notification(report_id, reply_devnumber, reply_data)

View File

@ -28,6 +28,7 @@ import solaar.configuration as _configuration
from . import base as _base from . import base as _base
from . import descriptors as _descriptors from . import descriptors as _descriptors
from . import exceptions
from . import hidpp10 as _hidpp10 from . import hidpp10 as _hidpp10
from . import hidpp20 as _hidpp20 from . import hidpp20 as _hidpp20
from .common import strhex as _strhex from .common import strhex as _strhex
@ -128,7 +129,7 @@ class Device:
self.wpid = _hid.find_paired_node_wpid(receiver.path, number) self.wpid = _hid.find_paired_node_wpid(receiver.path, number)
if not self.wpid: if not self.wpid:
logger.error('Unable to get wpid from udev for device %d of %s', number, receiver) logger.error('Unable to get wpid from udev for device %d of %s', number, receiver)
raise _base.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device') raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='Not present 27Mhz device')
kind = receiver.get_kind_from_index(number) kind = receiver.get_kind_from_index(number)
self._kind = _hidpp10.DEVICE_KIND[kind] self._kind = _hidpp10.DEVICE_KIND[kind]
else: # get information from pairing registers else: # get information from pairing registers
@ -136,7 +137,7 @@ class Device:
self.update_pairing_information() self.update_pairing_information()
self.update_extended_pairing_information() self.update_extended_pairing_information()
if not self.wpid and not self._serial: # if neither then the device almost certainly wasn't found if not self.wpid and not self._serial: # if neither then the device almost certainly wasn't found
raise _base.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial') raise exceptions.NoSuchDevice(number=number, receiver=receiver, error='no wpid or serial')
# the wpid is set to None on this object when the device is unpaired # the wpid is set to None on this object when the device is unpaired
assert self.wpid is not None, 'failed to read wpid: device %d of %s' % (number, receiver) assert self.wpid is not None, 'failed to read wpid: device %d of %s' % (number, receiver)
@ -207,7 +208,7 @@ class Device:
if not self.online: # be very defensive if not self.online: # be very defensive
try: try:
self.ping() self.ping()
except _base.NoSuchDevice: except exceptions.NoSuchDevice:
pass pass
if self.online and self.protocol >= 2.0: if self.online and self.protocol >= 2.0:
self._name = _hidpp20.get_name(self) self._name = _hidpp20.get_name(self)
@ -513,7 +514,7 @@ class Device:
def __str__(self): def __str__(self):
try: try:
name = self.name or self.codename or '?' name = self.name or self.codename or '?'
except _base.NoSuchDevice: except exceptions.NoSuchDevice:
name = 'name not available' name = 'name not available'
return '<Device(%d,%s,%s,%s)>' % (self.number, self.wpid or self.product_id, name, self.serial) return '<Device(%d,%s,%s,%s)>' % (self.number, self.wpid or self.product_id, name, self.serial)

View File

@ -0,0 +1,33 @@
from .common import KwException as _KwException
#
# Exceptions that may be raised by this API.
#
class NoReceiver(_KwException):
"""Raised when trying to talk through a previously open handle, when the
receiver is no longer available. Should only happen if the receiver is
physically disconnected from the machine, or its kernel driver module is
unloaded."""
pass
class NoSuchDevice(_KwException):
"""Raised when trying to reach a device number not paired to the receiver."""
pass
class DeviceUnreachable(_KwException):
"""Raised when a request is made to an unreachable (turned off) device."""
pass
class FeatureNotSupported(_KwException):
"""Raised when trying to request a feature not supported by the device."""
pass
class FeatureCallError(_KwException):
"""Raised if the device replied to a feature call with an error."""
pass

View File

@ -28,10 +28,9 @@ from typing import List, Optional
import yaml as _yaml import yaml as _yaml
from . import special_keys from . import exceptions, special_keys
from .common import BATTERY_APPROX as _BATTERY_APPROX from .common import BATTERY_APPROX as _BATTERY_APPROX
from .common import FirmwareInfo as _FirmwareInfo from .common import FirmwareInfo as _FirmwareInfo
from .common import KwException as _KwException
from .common import NamedInt as _NamedInt from .common import NamedInt as _NamedInt
from .common import NamedInts as _NamedInts from .common import NamedInts as _NamedInts
from .common import UnsortedNamedInts as _UnsortedNamedInts from .common import UnsortedNamedInts as _UnsortedNamedInts
@ -228,21 +227,6 @@ ERROR = _NamedInts(
# #
class FeatureNotSupported(_KwException):
"""Raised when trying to request a feature not supported by the device."""
pass
class FeatureCallError(_KwException):
"""Raised if the device replied to a feature call with an error."""
pass
#
#
#
class FeaturesArray(dict): class FeaturesArray(dict):
def __init__(self, device): def __init__(self, device):
@ -463,8 +447,8 @@ class ReprogrammableKeyV4(ReprogrammableKey):
mapping_flags_2 = 0 mapping_flags_2 = 0
self._mapping_flags = mapping_flags_1 | (mapping_flags_2 << 8) self._mapping_flags = mapping_flags_1 | (mapping_flags_2 << 8)
else: else:
raise FeatureCallError(msg='No reply from device.') raise exceptions.FeatureCallError(msg='No reply from device.')
except FeatureCallError: # if the key hasn't ever been configured then the read may fail so only produce a warning except exceptions.FeatureCallError: # if the key hasn't ever been configured then the read may fail so only produce a warning
if logger.isEnabledFor(logging.WARNING): if logger.isEnabledFor(logging.WARNING):
logger.warn( logger.warn(
f'Feature Call Error in _getCidReporting on device {self._device} for cid {self._cid} - use defaults' f'Feature Call Error in _getCidReporting on device {self._device} for cid {self._cid} - use defaults'
@ -499,7 +483,7 @@ class ReprogrammableKeyV4(ReprogrammableKey):
bfield = 0 bfield = 0
for f, v in flags.items(): for f, v in flags.items():
if v and FLAG_TO_CAPABILITY[f] not in self.flags: if v and FLAG_TO_CAPABILITY[f] not in self.flags:
raise FeatureNotSupported( raise exceptions.FeatureNotSupported(
msg=f'Tried to set mapping flag "{f}" on control "{self.key}" ' + msg=f'Tried to set mapping flag "{f}" on control "{self.key}" ' +
f'which does not support "{FLAG_TO_CAPABILITY[f]}" on device {self._device}.' f'which does not support "{FLAG_TO_CAPABILITY[f]}" on device {self._device}.'
) )
@ -512,7 +496,7 @@ class ReprogrammableKeyV4(ReprogrammableKey):
self._mapping_flags &= ~int(f) self._mapping_flags &= ~int(f)
if remap != 0 and remap not in self.remappable_to: if remap != 0 and remap not in self.remappable_to:
raise FeatureNotSupported( raise exceptions.FeatureNotSupported(
msg=f'Tried to remap control "{self.key}" to a control ID {remap} which it is not remappable to ' + msg=f'Tried to remap control "{self.key}" to a control ID {remap} which it is not remappable to ' +
f'on device {self._device}.' f'on device {self._device}.'
) )
@ -1045,7 +1029,7 @@ class Spec:
def read(self): def read(self):
try: try:
value = feature_request(self._device, FEATURE.GESTURE_2, 0x50, self.id, 0xFF) value = feature_request(self._device, FEATURE.GESTURE_2, 0x50, self.id, 0xFF)
except FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus) except exceptions.FeatureCallError: # some calls produce an error (notably spec 5 multiplier on K400Plus)
if logger.isEnabledFor(logging.WARNING): if logger.isEnabledFor(logging.WARNING):
logger.warn(f'Feature Call Error reading Gesture Spec on device {self._device} for spec {self.id} - use None') logger.warn(f'Feature Call Error reading Gesture Spec on device {self._device} for spec {self.id} - use None')
return None return None
@ -1130,7 +1114,7 @@ class Backlight:
def __init__(self, device): def __init__(self, device):
response = device.feature_request(FEATURE.BACKLIGHT2, 0x00) response = device.feature_request(FEATURE.BACKLIGHT2, 0x00)
if not response: if not response:
raise FeatureCallError(msg='No reply from device.') raise exceptions.FeatureCallError(msg='No reply from device.')
self.device = device self.device = device
self.enabled, self.options, supported, effects, self.level, self.dho, self.dhi, self.dpow = _unpack( self.enabled, self.options, supported, effects, self.level, self.dho, self.dhi, self.dpow = _unpack(
'<BBBHBHHH', response[:12] '<BBBHBHHH', response[:12]
@ -1819,7 +1803,7 @@ def get_adc_measurement(device):
report = feature_request(device, FEATURE.ADC_MEASUREMENT) report = feature_request(device, FEATURE.ADC_MEASUREMENT)
if report is not None: if report is not None:
return decipher_adc_measurement(report) return decipher_adc_measurement(report)
except FeatureCallError: except exceptions.FeatureCallError:
return FEATURE.ADC_MEASUREMENT if FEATURE.ADC_MEASUREMENT in device.features else None return FEATURE.ADC_MEASUREMENT if FEATURE.ADC_MEASUREMENT in device.features else None

View File

@ -20,6 +20,7 @@ import logging
import threading as _threading import threading as _threading
from . import base as _base from . import base as _base
from . import exceptions
# from time import time as _timestamp # from time import time as _timestamp
@ -163,7 +164,7 @@ class EventsListener(_threading.Thread):
if self._queued_notifications.empty(): if self._queued_notifications.empty():
try: try:
n = _base.read(self.receiver.handle, _EVENT_READ_TIMEOUT) n = _base.read(self.receiver.handle, _EVENT_READ_TIMEOUT)
except _base.NoReceiver: except exceptions.NoReceiver:
logger.warning('%s disconnected', self.receiver.name) logger.warning('%s disconnected', self.receiver.name)
self.receiver.close() self.receiver.close()
break break

View File

@ -22,15 +22,17 @@ import logging
import hidapi as _hid import hidapi as _hid
from . import base as _base from . import base as _base
from . import exceptions
from . import hidpp10 as _hidpp10 from . import hidpp10 as _hidpp10
from . import hidpp10_constants as _hidpp10_constants
from .base import product_information as _product_information from .base import product_information as _product_information
from .common import strhex as _strhex from .common import strhex as _strhex
from .device import Device from .device import Device
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_R = _hidpp10.REGISTERS _R = _hidpp10_constants.REGISTERS
_IR = _hidpp10.INFO_SUBREGISTERS _IR = _hidpp10_constants.INFO_SUBREGISTERS
# #
# #
@ -120,9 +122,9 @@ class Receiver:
if enable: if enable:
set_flag_bits = ( set_flag_bits = (
_hidpp10.NOTIFICATION_FLAG.battery_status _hidpp10_constants.NOTIFICATION_FLAG.battery_status
| _hidpp10.NOTIFICATION_FLAG.wireless | _hidpp10_constants.NOTIFICATION_FLAG.wireless
| _hidpp10.NOTIFICATION_FLAG.software_present | _hidpp10_constants.NOTIFICATION_FLAG.software_present
) )
else: else:
set_flag_bits = 0 set_flag_bits = 0
@ -132,7 +134,7 @@ class Receiver:
return None return None
flag_bits = _hidpp10.get_notification_flags(self) flag_bits = _hidpp10.get_notification_flags(self)
flag_names = None if flag_bits is None else tuple(_hidpp10.NOTIFICATION_FLAG.flag_names(flag_bits)) flag_names = None if flag_bits is None else tuple(_hidpp10_constants.NOTIFICATION_FLAG.flag_names(flag_bits))
if logger.isEnabledFor(logging.INFO): if logger.isEnabledFor(logging.INFO):
logger.info('%s: receiver notifications %s => %s', self, 'enabled' if enable else 'disabled', flag_names) logger.info('%s: receiver notifications %s => %s', self, 'enabled' if enable else 'disabled', flag_names)
return flag_bits return flag_bits
@ -154,33 +156,33 @@ class Receiver:
pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n) pair_info = self.read_register(_R.receiver_info, _IR.bolt_pairing_information + n)
if pair_info: if pair_info:
wpid = _strhex(pair_info[3:4]) + _strhex(pair_info[2:3]) wpid = _strhex(pair_info[3:4]) + _strhex(pair_info[2:3])
kind = _hidpp10.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F] kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[1:2]) & 0x0F]
return wpid, kind, 0 return wpid, kind, 0
else: else:
raise _base.NoSuchDevice(number=n, receiver=self, error='read Bolt wpid') raise exceptions.NoSuchDevice(number=n, receiver=self, error='read Bolt wpid')
wpid = 0 wpid = 0
kind = None kind = None
polling_rate = None polling_rate = None
pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1) pair_info = self.read_register(_R.receiver_info, _IR.pairing_information + n - 1)
if pair_info: # may be either a Unifying receiver, or an Unifying-ready receiver if pair_info: # may be either a Unifying receiver, or an Unifying-ready receiver
wpid = _strhex(pair_info[3:5]) wpid = _strhex(pair_info[3:5])
kind = _hidpp10.DEVICE_KIND[ord(pair_info[7:8]) & 0x0F] kind = _hidpp10_constants.DEVICE_KIND[ord(pair_info[7:8]) & 0x0F]
polling_rate = str(ord(pair_info[2:3])) + 'ms' polling_rate = str(ord(pair_info[2:3])) + 'ms'
elif self.receiver_kind == '27Mz': # 27Mhz receiver, fill extracting WPID from udev path elif self.receiver_kind == '27Mz': # 27Mhz receiver, fill extracting WPID from udev path
wpid = _hid.find_paired_node_wpid(self.path, n) wpid = _hid.find_paired_node_wpid(self.path, n)
if not wpid: if not wpid:
logger.error('Unable to get wpid from udev for device %d of %s', n, self) logger.error('Unable to get wpid from udev for device %d of %s', n, self)
raise _base.NoSuchDevice(number=n, receiver=self, error='Not present 27Mhz device') raise exceptions.NoSuchDevice(number=n, receiver=self, error='Not present 27Mhz device')
kind = _hidpp10.DEVICE_KIND[self.get_kind_from_index(n)] kind = _hidpp10.DEVICE_KIND[self.get_kind_from_index(n)]
elif not self.receiver_kind == 'unifying': # unifying protocol not supported, may be an old Nano receiver elif not self.receiver_kind == 'unifying': # unifying protocol not supported, may be an old Nano receiver
device_info = self.read_register(_R.receiver_info, 0x04) device_info = self.read_register(_R.receiver_info, 0x04)
if device_info: if device_info:
wpid = _strhex(device_info[3:5]) wpid = _strhex(device_info[3:5])
kind = _hidpp10.DEVICE_KIND[0x00] # unknown kind kind = _hidpp10_constants.DEVICE_KIND[0x00] # unknown kind
else: else:
raise _base.NoSuchDevice(number=n, receiver=self, error='read pairing information - non-unifying') raise exceptions.NoSuchDevice(number=n, receiver=self, error='read pairing information - non-unifying')
else: else:
raise _base.NoSuchDevice(number=n, receiver=self, error='read pairing information') raise exceptions.NoSuchDevice(number=n, receiver=self, error='read pairing information')
return wpid, kind, polling_rate return wpid, kind, polling_rate
def device_extended_pairing_information(self, n): def device_extended_pairing_information(self, n):
@ -217,7 +219,7 @@ class Receiver:
kind = 3 kind = 3
else: # unknown device number on 27Mhz receiver else: # unknown device number on 27Mhz receiver
logger.error('failed to calculate device kind for device %d of %s', index, self) logger.error('failed to calculate device kind for device %d of %s', index, self)
raise _base.NoSuchDevice(number=index, receiver=self, error='Unknown 27Mhz device number') raise exceptions.NoSuchDevice(number=index, receiver=self, error='Unknown 27Mhz device number')
return kind return kind
def notify_devices(self): def notify_devices(self):
@ -239,7 +241,7 @@ class Receiver:
logger.info('%s: found new device %d (%s)', self, number, dev.wpid) logger.info('%s: found new device %d (%s)', self, number, dev.wpid)
self._devices[number] = dev self._devices[number] = dev
return dev return dev
except _base.NoSuchDevice as e: except exceptions.NoSuchDevice as e:
logger.warning('register new device failed for %s device %d error %s', e.receiver, e.number, e.error) logger.warning('register new device failed for %s device %d error %s', e.receiver, e.number, e.error)
logger.warning('%s: looked for device %d, not found', self, number) logger.warning('%s: looked for device %d, not found', self, number)

View File

@ -16,7 +16,7 @@
## with this program; if not, write to the Free Software Foundation, Inc., ## with this program; if not, write to the Free Software Foundation, Inc.,
## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. ## 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
from logitech_receiver import base as _base from logitech_receiver import exceptions
from logitech_receiver import hidpp10 as _hidpp10 from logitech_receiver import hidpp10 as _hidpp10
from logitech_receiver import hidpp20 as _hidpp20 from logitech_receiver import hidpp20 as _hidpp20
from logitech_receiver import receiver as _receiver from logitech_receiver import receiver as _receiver
@ -85,7 +85,7 @@ def _print_device(dev, num=None):
# try to ping the device to see if it actually exists and to wake it up # try to ping the device to see if it actually exists and to wake it up
try: try:
dev.ping() dev.ping()
except _base.NoSuchDevice: except exceptions.NoSuchDevice:
print(' %s: Device not found' % num or dev.number) print(' %s: Device not found' % num or dev.number)
return return

View File

@ -28,7 +28,8 @@ import logitech_receiver.device as _device
import logitech_receiver.receiver as _receiver import logitech_receiver.receiver as _receiver
from logitech_receiver import base as _base from logitech_receiver import base as _base
from logitech_receiver import hidpp10 as _hidpp10 from logitech_receiver import exceptions
from logitech_receiver import hidpp10_constants as _hidpp10_constants
from logitech_receiver import listener as _listener from logitech_receiver import listener as _listener
from logitech_receiver import notifications as _notifications from logitech_receiver import notifications as _notifications
from logitech_receiver import status as _status from logitech_receiver import status as _status
@ -42,8 +43,8 @@ from gi.repository import GLib # NOQA: E402 # isort:skip
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
_R = _hidpp10.REGISTERS _R = _hidpp10_constants.REGISTERS
_IR = _hidpp10.INFO_SUBREGISTERS _IR = _hidpp10_constants.INFO_SUBREGISTERS
# #
# #
@ -89,7 +90,7 @@ class ReceiverListener(_listener.EventsListener):
logger.info('%s: notifications listener has started (%s)', self.receiver, self.receiver.handle) logger.info('%s: notifications listener has started (%s)', self.receiver, self.receiver.handle)
nfs = self.receiver.enable_connection_notifications() nfs = self.receiver.enable_connection_notifications()
if logger.isEnabledFor(logging.WARNING): if logger.isEnabledFor(logging.WARNING):
if not self.receiver.isDevice and not ((nfs if nfs else 0) & _hidpp10.NOTIFICATION_FLAG.wireless): if not self.receiver.isDevice and not ((nfs if nfs else 0) & _hidpp10_constants.NOTIFICATION_FLAG.wireless):
logger.warning( logger.warning(
'Receiver on %s might not support connection notifications, GUI might not show its devices', 'Receiver on %s might not support connection notifications, GUI might not show its devices',
self.receiver.path self.receiver.path
@ -403,7 +404,7 @@ def _process_add(device_info, retry):
_error_callback('permissions', device_info.path) _error_callback('permissions', device_info.path)
else: else:
_error_callback('nodevice', device_info.path) _error_callback('nodevice', device_info.path)
except _base.NoReceiver: except exceptions.NoReceiver:
_error_callback('nodevice', device_info.path) _error_callback('nodevice', device_info.path)