base: add a number of relevant device ids and print them when the "show" command is executed

This commit is contained in:
effective-light 2020-08-10 02:54:11 -04:00 committed by Peter F. Patel-Schneider
parent 16823092bc
commit b98033d558
6 changed files with 107 additions and 71 deletions

View File

@ -33,6 +33,7 @@ import hidapi as _hid
from . import hidpp10 as _hidpp10
from . import hidpp20 as _hidpp20
from .base_usb import ALL as _RECEIVER_USB_IDS
from .base_usb import WIRED_DEVICES as _WIRED_DEVICE_IDS
from .common import KwException as _KwException
from .common import pack as _pack
from .common import strhex as _strhex
@ -94,6 +95,10 @@ def receivers():
for d in _hid.enumerate(receiver_usb_id):
yield d
def wired_devices():
for device_usb_id in _WIRED_DEVICE_IDS:
for dev in _hid.enumerate(device_usb_id):
yield dev
def notify_on_receivers_glib(callback):
"""Watch for matching devices and notifies the callback on the GLib thread."""

View File

@ -99,6 +99,12 @@ _ex100_receiver = lambda product_id: {
'ex100_wpid_fix': True
}
_wired_device = lambda product_id: {
'vendor_id': 0x046d,
'product_id': product_id,
'usb_interface': 2
}
# standard Unifying receivers (marked with the orange Unifying logo)
UNIFYING_RECEIVER_C52B = _unifying_receiver(0xc52b)
UNIFYING_RECEIVER_C532 = _unifying_receiver(0xc532)
@ -130,7 +136,16 @@ LIGHTSPEED_RECEIVER_C53d = _lightspeed_receiver(0xc53d)
LIGHTSPEED_RECEIVER_C545 = _lightspeed_receiver(0xc545)
LIGHTSPEED_RECEIVER_C541 = _lightspeed_receiver(0xc541)
del _DRIVER, _unifying_receiver, _nano_receiver, _lenovo_receiver, _lightspeed_receiver
# Wired devices
WIRED_DEVICE_C081 = _wired_device(0xc081)
WIRED_DEVICE_C082 = _wired_device(0xc082)
WIRED_DEVICE_C086 = _wired_device(0xc086)
WIRED_DEVICE_C087 = _wired_device(0xc087)
WIRED_DEVICE_C088 = _wired_device(0xc088)
WIRED_DEVICE_C090 = _wired_device(0xc090)
WIRED_DEVICE_C091 = _wired_device(0xc091)
del _DRIVER, _unifying_receiver, _nano_receiver, _lenovo_receiver, _lightspeed_receiver, _wired_device
ALL = (
UNIFYING_RECEIVER_C52B,
@ -156,6 +171,15 @@ ALL = (
LIGHTSPEED_RECEIVER_C541,
)
WIRED_DEVICES = (
WIRED_DEVICE_C081,
WIRED_DEVICE_C082,
WIRED_DEVICE_C086,
WIRED_DEVICE_C087,
WIRED_DEVICE_C088,
WIRED_DEVICE_C090,
WIRED_DEVICE_C091,
)
def product_information(usb_id):
if isinstance(usb_id, str):

View File

@ -30,7 +30,7 @@ class Device(object):
def __init__(self, receiver, number, link_notification=None, info=None):
assert receiver or info
self.receiver = receiver
if receiver:
assert number > 0 and number <= receiver.max_devices
else:
@ -68,6 +68,7 @@ class Device(object):
self.handle = None
self.path = None
self.product_id = None
# if _log.isEnabledFor(_DEBUG):
# _log.debug("new Device(%s, %s, %s)", receiver, number, link_notification)
@ -75,7 +76,7 @@ class Device(object):
if receiver:
if link_notification is not None:
self.online = not bool(ord(link_notification.data[0:1]) & 0x40)
self.wpid = _strhex(link_notification.data[2:3] \
self.wpid = _strhex(link_notification.data[2:3]
+ link_notification.data[1:2])
# assert link_notification.address == (0x04
# if unifying else 0x03)
@ -91,8 +92,7 @@ class Device(object):
self._kind = _hidpp10.DEVICE_KIND[kind]
else:
# force a reading of the wpid
pair_info = self.receiver.read_register(_R.receiver_info, \
0x20 + number - 1)
pair_info = self.receiver.read_register(_R.receiver_info, 0x20 + number - 1)
if pair_info:
# may be either a Unifying receiver, or an Unifying-ready
# receiver
@ -112,15 +112,13 @@ class Device(object):
self._kind = _hidpp10.DEVICE_KIND[1]
else: # unknown device number on EX100
_log.error('failed to set fake EX100 wpid for device %d of %s', number, receiver)
raise _base.NoSuchDevice(number=number, \
receiver=receiver, error='Unknown EX100 device')
raise _base.NoSuchDevice(number=number, receiver=receiver, error='Unknown EX100 device')
else:
# unifying protocol not supported, must be a Nano receiver
device_info = self.receiver.read_register(_R.receiver_info,
0x04)
device_info = self.receiver.read_register(_R.receiver_info, 0x04)
if device_info is None:
_log.error('failed to read Nano wpid for device %d of %s', number, receiver)
raise _base.NoSuchDevice(number=number, \
raise _base.NoSuchDevice(number=number,
receiver=receiver, error='read Nano wpid')
self.wpid = _strhex(device_info[3:5])
@ -129,11 +127,9 @@ class Device(object):
# the wpid is necessary to properly identify wireless link on/off
# notifications also it gets set to None on this object when the
# device is unpaired
assert self.wpid is not None, \
'failed to read wpid: device %d of %s' % (number, receiver)
assert self.wpid is not None, 'failed to read wpid: device %d of %s' % (number, receiver)
for dev in _hid.enumerate({'vendor_id': 0x046d, \
'product_id': int(self.receiver.product_id, 16)}):
for dev in _hid.enumerate({'vendor_id': 0x046d, 'product_id': int(self.receiver.product_id, 16)}):
if dev.serial and dev.serial.startswith(self.wpid):
self.path = dev.path
self.handle = _hid.open_path(dev.path)
@ -144,8 +140,7 @@ class Device(object):
if self.descriptor is None:
# Last chance to correctly identify the device; many Nano
# receivers do not support this call.
codename = self.receiver.read_register(_R.receiver_info, \
0x40 + self.number - 1)
codename = self.receiver.read_register(_R.receiver_info, 0x40 + self.number - 1)
if codename:
codename_length = ord(codename[1:2])
codename = codename[2:2 + codename_length]
@ -162,10 +157,10 @@ class Device(object):
else:
self.path = info.path
self.handle = _hid.open_path(self.path)
self.product_id = info.product_id
if self._protocol is not None:
self.features = None if self._protocol < 2.0 else \
_hidpp20.FeaturesArray(self)
self.features = None if self._protocol < 2.0 else _hidpp20.FeaturesArray(self)
else:
# may be a 2.0 device; if not, it will fix itself later
self.features = _hidpp20.FeaturesArray(self)
@ -184,8 +179,7 @@ class Device(object):
@property
def codename(self):
if not self._codename:
codename = self.receiver.read_register(_R.receiver_info, \
0x40 + self.number - 1) if self.receiver else None
codename = self.receiver.read_register(_R.receiver_info, 0x40 + self.number - 1) if self.receiver else None
if codename:
codename_length = ord(codename[1:2])
codename = codename[2:2 + codename_length]
@ -206,8 +200,7 @@ class Device(object):
@property
def kind(self):
if not self._kind:
pair_info = self.receiver.read_register(_R.receiver_info, \
0x20 + self.number - 1) if self.receiver else None
pair_info = self.receiver.read_register(_R.receiver_info, 0x20 + self.number - 1) if self.receiver else None
if pair_info:
kind = ord(pair_info[7:8]) & 0x0F
self._kind = _hidpp10.DEVICE_KIND[kind]
@ -227,8 +220,7 @@ class Device(object):
@property
def serial(self):
if not self._serial and self.receiver:
serial = self.receiver.read_register(_R.receiver_info, \
0x30 + self.number - 1)
serial = self.receiver.read_register(_R.receiver_info, 0x30 + self.number - 1)
if serial:
ps = ord(serial[9:10]) & 0x0F
self._power_switch = _hidpp10.POWER_SWITCH_LOCATION[ps]
@ -246,8 +238,7 @@ class Device(object):
@property
def power_switch_location(self):
if not self._power_switch and self.receiver:
ps = self.receiver.read_register(_R.receiver_info, \
0x30 + self.number - 1)
ps = self.receiver.read_register(_R.receiver_info, 0x30 + self.number - 1)
if ps:
ps = ord(ps[9:10]) & 0x0F
self._power_switch = _hidpp10.POWER_SWITCH_LOCATION[ps]
@ -258,14 +249,12 @@ class Device(object):
@property
def polling_rate(self):
if not self._polling_rate and self.receiver:
pair_info = self.receiver.read_register(_R.receiver_info, \
0x20 + self.number - 1)
pair_info = self.receiver.read_register(_R.receiver_info, 0x20 + self.number - 1)
if pair_info:
self._polling_rate = ord(pair_info[2:3])
else:
self._polling_rate = 0
if self.online and self.protocol >= 2.0 and self.features \
and _hidpp20.FEATURE.REPORT_RATE in self.features:
if self.online and self.protocol >= 2.0 and self.features and _hidpp20.FEATURE.REPORT_RATE in self.features:
rate = _hidpp20.get_polling_rate(self)
self._polling_rate = rate if rate else self._polling_rate
return self._polling_rate
@ -302,8 +291,7 @@ class Device(object):
if setting is not None:
self._settings.append(setting)
if not self._feature_settings_checked:
self._feature_settings_checked = _check_feature_settings(self, \
self._settings)
self._feature_settings_checked = _check_feature_settings(self, self._settings)
return self._settings
def enable_notifications(self, enable=True):
@ -323,26 +311,21 @@ class Device(object):
set_flag_bits = 0
ok = _hidpp10.set_notification_flags(self, set_flag_bits)
if not ok:
_log.warn('%s: failed to %s device notifications', self, 'enable' \
if enable else 'disable')
_log.warn('%s: failed to %s device notifications', self, 'enable' if enable else 'disable')
flag_bits = _hidpp10.get_notification_flags(self)
flag_names = None if flag_bits is None else \
tuple(_hidpp10.NOTIFICATION_FLAG.flag_names(flag_bits))
flag_names = None if flag_bits is None else tuple(_hidpp10.NOTIFICATION_FLAG.flag_names(flag_bits))
if _log.isEnabledFor(_INFO):
_log.info('%s: device notifications %s %s', self, 'enabled' \
if enable else 'disabled', flag_names)
_log.info('%s: device notifications %s %s', self, 'enabled' if enable else 'disabled', flag_names)
return flag_bits if ok else None
def request(self, request_id, *params, no_reply=False):
return _base.request(self.handle, self.number, request_id, *params, \
no_reply=no_reply)
return _base.request(self.handle, self.number, request_id, *params, no_reply=no_reply)
def feature_request(self, feature, function=0x00, *params, no_reply=False):
if self.protocol >= 2.0:
return _hidpp20.feature_request(self, feature, function, *params, \
no_reply=no_reply)
return _hidpp20.feature_request(self, feature, function, *params, no_reply=no_reply)
def ping(self):
"""Checks if the device is online, returns True of False"""
@ -358,21 +341,19 @@ class Device(object):
__int__ = __index__
def __eq__(self, other):
return other is not None and self.kind == other.kind \
and self.wpid == other.wpid
return other is not None and self.kind == other.kind and self.wpid == other.wpid
def __ne__(self, other):
return other is None or self.kind != other.kind \
or self.wpid != other.wpid
return other is None or self.kind != other.kind or self.wpid != other.wpid
def __hash__(self):
return self.wpid.__hash__()
__bool__ = __nonzero__ = lambda self: self.wpid is not None \
and self.number in self.receiver
__bool__ = __nonzero__ = lambda self: self.wpid is not None and self.number in self.receiver
def __str__(self):
return '<Device(%d,%s,%s,%s)>' % (self.number, self.wpid, \
return '<Device(%d,%s,%s,%s)>' % (self.number,
self.wpid or self.product_id,
self.name or self.codename or '?', self.serial)
__unicode__ = __repr__ = __str__

View File

@ -28,11 +28,8 @@ from .device import Device
from . import base as _base
from . import hidpp10 as _hidpp10
from . import hidpp20 as _hidpp20
from .base_usb import product_information as _product_information
from .common import strhex as _strhex
from .descriptors import DEVICES as _DESCRIPTORS
from .i18n import _
_log = getLogger(__name__)
del getLogger

View File

@ -109,6 +109,22 @@ def _receivers(dev_path=None):
_log.exception('opening ' + str(dev_info))
_sys.exit('%s: error: %s' % (NAME, str(e)))
def _wired_devices(dev_path=None):
from logitech_receiver import Device
from logitech_receiver.base import wired_devices
for dev_info in wired_devices():
if dev_path is not None and dev_path != dev_info.path:
continue
try:
d = Device(None, 0, info=dev_info)
if _log.isEnabledFor(_DEBUG):
_log.debug('[%s] => %s', dev_info.path, d)
if d is not None:
yield d
except Exception as e:
_log.exception('opening ' + str(dev_info))
_sys.exit('%s: error: %s' % (NAME, str(e)))
def _find_receiver(receivers, name):
assert receivers
@ -169,7 +185,12 @@ def run(cli_args=None, hidraw_path=None):
try:
c = list(_receivers(hidraw_path))
if not c:
raise Exception('Logitech receiver not found')
if action != 'show':
raise Exception('Logitech receiver not found')
else:
c += list(_wired_devices(hidraw_path))
if not c:
raise Exception('No devices found')
from importlib import import_module
m = import_module('.' + action, package=__name__)

View File

@ -23,6 +23,7 @@ from logitech_receiver import hidpp10 as _hidpp10
from logitech_receiver import hidpp20 as _hidpp20
from logitech_receiver import settings_templates as _settings_templates
from logitech_receiver.common import NamedInt as _NamedInt
from logitech_receiver import receiver as _receiver
def _print_receiver(receiver):
@ -64,16 +65,16 @@ def _battery_text(level):
return '%d%%' % level
def _print_device(dev):
def _print_device(dev, num=None):
assert dev is not None
# check if the device is online
dev.ping()
print(' %d: %s' % (dev.number, dev.name))
print(' %d: %s' % (num or dev.number, dev.name))
print(' Device path :', dev.path)
print(' USB id : 046d:%s' % (dev.wpid or dev.product_id))
print(' Codename :', dev.codename)
print(' Kind :', dev.kind)
print(' Wireless PID :', dev.wpid)
if dev.protocol:
print(' Protocol : HID++ %1.1f' % dev.protocol)
else:
@ -222,31 +223,38 @@ def _print_device(dev):
print(' Battery: unknown (device is offline).')
def run(receivers, args, find_receiver, find_device):
assert receivers
def run(devices, args, find_receiver, find_device):
assert devices
assert args.device
device_name = args.device.lower()
if device_name == 'all':
for r in receivers:
_print_receiver(r)
count = r.count()
if count:
for dev in r:
print('')
_print_device(dev)
count -= 1
if not count:
break
print('')
dev_num = 1
for d in devices:
if isinstance(d, _receiver.Receiver):
_print_receiver(d)
count = d.count()
if count:
for dev in d:
print('')
_print_device(dev)
count -= 1
if not count:
break
print('')
else:
if dev_num == 1:
print('Wired Devices')
_print_device(d, num=dev_num)
dev_num += 1
return
dev = find_receiver(receivers, device_name)
dev = find_receiver(devices, device_name)
if dev:
_print_receiver(dev)
return
dev = find_device(receivers, device_name)
dev = find_device(devices, device_name)
assert dev
_print_device(dev)